You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/17 01:23:11 UTC

[kafka] branch trunk updated: MINOR: Add ZK dual-write lag metric (#14009)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e50f7cdd37 MINOR: Add ZK dual-write lag metric (#14009)
9e50f7cdd37 is described below

commit 9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7
Author: Hailey Ni <13...@users.noreply.github.com>
AuthorDate: Sun Jul 16 20:23:01 2023 -0500

    MINOR: Add ZK dual-write lag metric (#14009)
    
    This patch adds ZKWriteBehindLag metric to the KafkaController mbean as specified in KIP-866
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../main/scala/kafka/server/ControllerServer.scala |  5 +-
 .../apache/kafka/controller/QuorumController.java  |  2 +-
 .../metrics/QuorumControllerMetrics.java           | 29 +++++++++-
 .../metadata/migration/KRaftMigrationDriver.java   | 12 +++-
 .../kafka/controller/QuorumControllerTest.java     |  2 +-
 .../metrics/QuorumControllerMetricsTest.java       | 66 ++++++++++++++++++++--
 .../migration/KRaftMigrationDriverTest.java        | 44 +++++++++++----
 7 files changed, 136 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index c6ec0392ebe..db66b117412 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -214,7 +214,7 @@ class ControllerServer(
 
         val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
 
-        quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time)
+        quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.migrationEnabled)
 
         new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId).
           setTime(time).
@@ -268,7 +268,8 @@ class ControllerServer(
             () => {}
           ),
           quorumFeatures,
-          configSchema
+          configSchema,
+          quorumControllerMetrics
         )
         migrationDriver.start()
         migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index c252afdd6ec..111937b3fa3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -346,7 +346,7 @@ public final class QuorumController implements Controller {
                 logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId));
             }
             if (controllerMetrics == null) {
-                controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time);
+                controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time, zkMigrationEnabled);
             }
 
             KafkaEventQueue queue = null;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 566245ed096..e267ebdfb9a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -44,6 +44,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
         "ControllerEventManager", "EventQueueTimeMs");
     private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
         "ControllerEventManager", "EventQueueProcessingTimeMs");
+    private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
+            "KafkaController", "ZKWriteBehindLag");
     private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
         "KafkaController", "LastAppliedRecordOffset");
     private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -58,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
     private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
+    private final AtomicLong dualWriteOffset = new AtomicLong(0);
     private final Consumer<Long> eventQueueTimeUpdater;
     private final Consumer<Long> eventQueueProcessingTimeUpdater;
     private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
@@ -73,7 +76,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
 
     public QuorumControllerMetrics(
         Optional<MetricsRegistry> registry,
-        Time time
+        Time time,
+        boolean zkMigrationState
     ) {
         this.registry = registry;
         this.active = false;
@@ -109,6 +113,18 @@ public class QuorumControllerMetrics implements AutoCloseable {
                 return time.milliseconds() - lastAppliedRecordTimestamp();
             }
         }));
+
+        if (zkMigrationState) {
+            registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() {
+                @Override
+                public Long value() {
+                    // not in dual-write mode: set metric value to 0
+                    if (dualWriteOffset() == 0) return 0L;
+                    // in dual write mode
+                    else return lastCommittedRecordOffset() - dualWriteOffset();
+                }
+            }));
+        }
     }
 
     public void setActive(boolean active) {
@@ -151,6 +167,14 @@ public class QuorumControllerMetrics implements AutoCloseable {
         return lastAppliedRecordTimestamp.get();
     }
 
+    public void updateDualWriteOffset(long offset) {
+        dualWriteOffset.set(offset);
+    }
+
+    public long dualWriteOffset() {
+        return dualWriteOffset.get();
+    }
+
     public void incrementTimedOutHeartbeats() {
         timedOutHeartbeats.addAndGet(1);
     }
@@ -172,7 +196,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
             LAST_APPLIED_RECORD_OFFSET,
             LAST_COMMITTED_RECORD_OFFSET,
             LAST_APPLIED_RECORD_TIMESTAMP,
-            LAST_APPLIED_RECORD_LAG_MS
+            LAST_APPLIED_RECORD_LAG_MS,
+            ZK_WRITE_BEHIND_LAG
         ).forEach(r::removeMetric));
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index b273a42406b..740c3e4b1dd 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.controller.QuorumFeatures;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
@@ -97,6 +98,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private final ZkRecordConsumer zkRecordConsumer;
     private final KafkaEventQueue eventQueue;
     private final PollTimeSupplier pollTimeSupplier;
+    private final QuorumControllerMetrics controllerMetrics;
     private final FaultHandler faultHandler;
     private final QuorumFeatures quorumFeatures;
     private final RecordRedactor recordRedactor;
@@ -119,6 +121,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         FaultHandler faultHandler,
         QuorumFeatures quorumFeatures,
         KafkaConfigSchema configSchema,
+        QuorumControllerMetrics controllerMetrics,
         Time time
     ) {
         this.nodeId = nodeId;
@@ -127,6 +130,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         this.propagator = propagator;
         this.time = time;
         LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
+        this.controllerMetrics = controllerMetrics;
         this.log = logContext.logger(KRaftMigrationDriver.class);
         this.migrationState = MigrationDriverState.UNINITIALIZED;
         this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
@@ -149,9 +153,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         Consumer<MetadataPublisher> initialZkLoadHandler,
         FaultHandler faultHandler,
         QuorumFeatures quorumFeatures,
-        KafkaConfigSchema configSchema
+        KafkaConfigSchema configSchema,
+        QuorumControllerMetrics controllerMetrics
     ) {
-        this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, Time.SYSTEM);
+        this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, controllerMetrics, Time.SYSTEM);
     }
 
 
@@ -497,6 +502,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             // Persist the offset of the metadata that was written to ZK
             ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
                     image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
+            //update the dual write offset metric
+            controllerMetrics.updateDualWriteOffset(image.highestOffsetAndEpoch().offset());
+
             applyMigrationOperation("Updating ZK migration state after " + metadataType,
                     state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index d55ff5f67c2..be39b9f74ee 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -144,7 +144,7 @@ public class QuorumControllerTest {
         final AtomicBoolean closed = new AtomicBoolean(false);
 
         MockControllerMetrics() {
-            super(Optional.empty(), Time.SYSTEM);
+            super(Optional.empty(), Time.SYSTEM, false);
         }
 
         @Override
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 619100f1ed8..bec023020a4 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -33,11 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class QuorumControllerMetricsTest {
     @Test
-    public void testMetricNames() {
+    public void testMetricNamesNotInMigrationState() {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
         try {
-            try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+            try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
                 ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
                     new HashSet<>(Arrays.asList(
                         "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
@@ -57,11 +57,37 @@ public class QuorumControllerMetricsTest {
         }
     }
 
+    @Test
+    public void testMetricNamesInMigrationState() {
+        MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
+        try {
+            try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
+                ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
+                    new HashSet<>(Arrays.asList(
+                        "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+                        "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
+                        "kafka.controller:type=KafkaController,name=ActiveControllerCount",
+                        "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+                        "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
+                        "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+                        "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
+                        "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+                        "kafka.controller:type=KafkaController,name=ZKWriteBehindLag"
+                        )));
+            }
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
+                    Collections.emptySet());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     @Test
     public void testUpdateEventQueueTime() {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
-        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
             metrics.updateEventQueueTime(1000);
             assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
         } finally {
@@ -73,7 +99,7 @@ public class QuorumControllerMetricsTest {
     public void testUpdateEventQueueProcessingTime() {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
-        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
             metrics.updateEventQueueProcessingTime(1000);
             assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
         } finally {
@@ -86,7 +112,7 @@ public class QuorumControllerMetricsTest {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
         time.sleep(1000);
-        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
             metrics.setLastAppliedRecordOffset(100);
             metrics.setLastAppliedRecordTimestamp(500);
             metrics.setLastCommittedRecordOffset(50);
@@ -119,6 +145,36 @@ public class QuorumControllerMetricsTest {
         }
     }
 
+    @Test
+    public void testUpdateZKWriteBehindLag() {
+        MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
+        // test zkWriteBehindLag metric when NOT in dual-write mode
+        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
+            metrics.updateDualWriteOffset(0);
+            @SuppressWarnings("unchecked")
+            Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", "ZKWriteBehindLag"));
+            assertEquals(0, zkWriteBehindLag.value());
+        } finally {
+            registry.shutdown();
+        }
+
+        // test zkWriteBehindLag metric when in dual-write mode
+        try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
+            metrics.updateDualWriteOffset(90);
+            metrics.setLastCommittedRecordOffset(100);
+            @SuppressWarnings("unchecked")
+            Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", "ZKWriteBehindLag"));
+            assertEquals(10, zkWriteBehindLag.value());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
         Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index e530cc63aac..8b42447a022 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.controller.QuorumFeatures;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
 import org.apache.kafka.image.AclsImage;
 import org.apache.kafka.image.ClientQuotasImage;
 import org.apache.kafka.image.ClusterImage;
@@ -63,9 +64,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -88,6 +91,22 @@ public class KRaftMigrationDriverTest {
         apiVersions,
         QuorumFeatures.defaultFeatureMap(),
         controllerNodes);
+
+    static class MockControllerMetrics extends QuorumControllerMetrics {
+        final AtomicBoolean closed = new AtomicBoolean(false);
+
+        MockControllerMetrics() {
+            super(Optional.empty(), Time.SYSTEM, false);
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            closed.set(true);
+        }
+    }
+    MockControllerMetrics metrics = new MockControllerMetrics();
+
     Time mockTime = new MockTime(1) {
         public long nanoseconds() {
             // We poll the event for each 1 sec, make it happen for each 10 ms to speed up the test
@@ -216,9 +235,9 @@ public class KRaftMigrationDriverTest {
             new MockFaultHandler("test"),
             quorumFeatures,
             KafkaConfigSchema.EMPTY,
+            metrics,
             mockTime
         )) {
-
             MetadataImage image = MetadataImage.EMPTY;
             MetadataDelta delta = new MetadataDelta(image);
 
@@ -302,6 +321,7 @@ public class KRaftMigrationDriverTest {
             faultHandler,
             quorumFeatures,
             KafkaConfigSchema.EMPTY,
+            metrics,
             mockTime
         )) {
             MetadataImage image = MetadataImage.EMPTY;
@@ -348,9 +368,9 @@ public class KRaftMigrationDriverTest {
             new MockFaultHandler("test"),
             quorumFeatures,
             KafkaConfigSchema.EMPTY,
+            metrics,
             mockTime
         )) {
-
             MetadataImage image = MetadataImage.EMPTY;
             MetadataDelta delta = new MetadataDelta(image);
 
@@ -387,15 +407,16 @@ public class KRaftMigrationDriverTest {
             new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
         MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
         try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
-                3000,
-                new NoOpRecordConsumer(),
-                migrationClient,
-                metadataPropagator,
-                metadataPublisher -> { },
-                faultHandler,
-                quorumFeatures,
-                KafkaConfigSchema.EMPTY,
-                mockTime
+            3000,
+            new NoOpRecordConsumer(),
+            migrationClient,
+            metadataPropagator,
+            metadataPublisher -> { },
+            faultHandler,
+            quorumFeatures,
+            KafkaConfigSchema.EMPTY,
+            metrics,
+            mockTime
         )) {
             MetadataImage image = MetadataImage.EMPTY;
             MetadataDelta delta = new MetadataDelta(image);
@@ -467,6 +488,7 @@ public class KRaftMigrationDriverTest {
             new MockFaultHandler("test"),
             quorumFeatures,
             KafkaConfigSchema.EMPTY,
+            metrics,
             mockTime
         )) {
             verifier.verify(driver, migrationClient, topicClient, configClient);