You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/17 01:23:11 UTC
[kafka] branch trunk updated: MINOR: Add ZK dual-write lag metric (#14009)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9e50f7cdd37 MINOR: Add ZK dual-write lag metric (#14009)
9e50f7cdd37 is described below
commit 9e50f7cdd37f923cfef4711cf11c1c5271a0a6c7
Author: Hailey Ni <13...@users.noreply.github.com>
AuthorDate: Sun Jul 16 20:23:01 2023 -0500
MINOR: Add ZK dual-write lag metric (#14009)
This patch adds ZKWriteBehindLag metric to the KafkaController mbean as specified in KIP-866
Reviewers: David Arthur <mu...@gmail.com>
---
.../main/scala/kafka/server/ControllerServer.scala | 5 +-
.../apache/kafka/controller/QuorumController.java | 2 +-
.../metrics/QuorumControllerMetrics.java | 29 +++++++++-
.../metadata/migration/KRaftMigrationDriver.java | 12 +++-
.../kafka/controller/QuorumControllerTest.java | 2 +-
.../metrics/QuorumControllerMetricsTest.java | 66 ++++++++++++++++++++--
.../migration/KRaftMigrationDriverTest.java | 44 +++++++++++----
7 files changed, 136 insertions(+), 24 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index c6ec0392ebe..db66b117412 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -214,7 +214,7 @@ class ControllerServer(
val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
- quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time)
+ quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.migrationEnabled)
new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId).
setTime(time).
@@ -268,7 +268,8 @@ class ControllerServer(
() => {}
),
quorumFeatures,
- configSchema
+ configSchema,
+ quorumControllerMetrics
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index c252afdd6ec..111937b3fa3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -346,7 +346,7 @@ public final class QuorumController implements Controller {
logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId));
}
if (controllerMetrics == null) {
- controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time);
+ controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time, zkMigrationEnabled);
}
KafkaEventQueue queue = null;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 566245ed096..e267ebdfb9a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -44,6 +44,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
+ private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
+ "KafkaController", "ZKWriteBehindLag");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -58,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
+ private final AtomicLong dualWriteOffset = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
@@ -73,7 +76,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
public QuorumControllerMetrics(
Optional<MetricsRegistry> registry,
- Time time
+ Time time,
+ boolean zkMigrationState
) {
this.registry = registry;
this.active = false;
@@ -109,6 +113,18 @@ public class QuorumControllerMetrics implements AutoCloseable {
return time.milliseconds() - lastAppliedRecordTimestamp();
}
}));
+
+ if (zkMigrationState) {
+ registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() {
+ @Override
+ public Long value() {
+ // not in dual-write mode: set metric value to 0
+ if (dualWriteOffset() == 0) return 0L;
+ // in dual write mode
+ else return lastCommittedRecordOffset() - dualWriteOffset();
+ }
+ }));
+ }
}
public void setActive(boolean active) {
@@ -151,6 +167,14 @@ public class QuorumControllerMetrics implements AutoCloseable {
return lastAppliedRecordTimestamp.get();
}
+ public void updateDualWriteOffset(long offset) {
+ dualWriteOffset.set(offset);
+ }
+
+ public long dualWriteOffset() {
+ return dualWriteOffset.get();
+ }
+
public void incrementTimedOutHeartbeats() {
timedOutHeartbeats.addAndGet(1);
}
@@ -172,7 +196,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
LAST_APPLIED_RECORD_OFFSET,
LAST_COMMITTED_RECORD_OFFSET,
LAST_APPLIED_RECORD_TIMESTAMP,
- LAST_APPLIED_RECORD_LAG_MS
+ LAST_APPLIED_RECORD_LAG_MS,
+ ZK_WRITE_BEHIND_LAG
).forEach(r::removeMetric));
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index b273a42406b..740c3e4b1dd 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@@ -97,6 +98,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private final ZkRecordConsumer zkRecordConsumer;
private final KafkaEventQueue eventQueue;
private final PollTimeSupplier pollTimeSupplier;
+ private final QuorumControllerMetrics controllerMetrics;
private final FaultHandler faultHandler;
private final QuorumFeatures quorumFeatures;
private final RecordRedactor recordRedactor;
@@ -119,6 +121,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema,
+ QuorumControllerMetrics controllerMetrics,
Time time
) {
this.nodeId = nodeId;
@@ -127,6 +130,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.propagator = propagator;
this.time = time;
LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
+ this.controllerMetrics = controllerMetrics;
this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
@@ -149,9 +153,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
- KafkaConfigSchema configSchema
+ KafkaConfigSchema configSchema,
+ QuorumControllerMetrics controllerMetrics
) {
- this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, Time.SYSTEM);
+ this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, controllerMetrics, Time.SYSTEM);
}
@@ -497,6 +502,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
// Persist the offset of the metadata that was written to ZK
ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
+ //update the dual write offset metric
+ controllerMetrics.updateDualWriteOffset(image.highestOffsetAndEpoch().offset());
+
applyMigrationOperation("Updating ZK migration state after " + metadataType,
state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index d55ff5f67c2..be39b9f74ee 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -144,7 +144,7 @@ public class QuorumControllerTest {
final AtomicBoolean closed = new AtomicBoolean(false);
MockControllerMetrics() {
- super(Optional.empty(), Time.SYSTEM);
+ super(Optional.empty(), Time.SYSTEM, false);
}
@Override
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 619100f1ed8..bec023020a4 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -33,11 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class QuorumControllerMetricsTest {
@Test
- public void testMetricNames() {
+ public void testMetricNamesNotInMigrationState() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
- try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
new HashSet<>(Arrays.asList(
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
@@ -57,11 +57,37 @@ public class QuorumControllerMetricsTest {
}
}
+ @Test
+ public void testMetricNamesInMigrationState() {
+ MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
+ try {
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
+ new HashSet<>(Arrays.asList(
+ "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+ "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
+ "kafka.controller:type=KafkaController,name=ActiveControllerCount",
+ "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+ "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
+ "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+ "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
+ "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+ "kafka.controller:type=KafkaController,name=ZKWriteBehindLag"
+ )));
+ }
+ ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
+ Collections.emptySet());
+ } finally {
+ registry.shutdown();
+ }
+ }
+
@Test
public void testUpdateEventQueueTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
- try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.updateEventQueueTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
} finally {
@@ -73,7 +99,7 @@ public class QuorumControllerMetricsTest {
public void testUpdateEventQueueProcessingTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
- try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.updateEventQueueProcessingTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
} finally {
@@ -86,7 +112,7 @@ public class QuorumControllerMetricsTest {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
time.sleep(1000);
- try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.setLastAppliedRecordOffset(100);
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
@@ -119,6 +145,36 @@ public class QuorumControllerMetricsTest {
}
}
+ @Test
+ public void testUpdateZKWriteBehindLag() {
+ MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
+ // test zkWriteBehindLag metric when NOT in dual-write mode
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
+ metrics.updateDualWriteOffset(0);
+ @SuppressWarnings("unchecked")
+ Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "ZKWriteBehindLag"));
+ assertEquals(0, zkWriteBehindLag.value());
+ } finally {
+ registry.shutdown();
+ }
+
+ // test zkWriteBehindLag metric when in dual-write mode
+ try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
+ metrics.updateDualWriteOffset(90);
+ metrics.setLastCommittedRecordOffset(100);
+ @SuppressWarnings("unchecked")
+ Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "ZKWriteBehindLag"));
+ assertEquals(10, zkWriteBehindLag.value());
+ } finally {
+ registry.shutdown();
+ }
+ }
+
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index e530cc63aac..8b42447a022 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
@@ -63,9 +64,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -88,6 +91,22 @@ public class KRaftMigrationDriverTest {
apiVersions,
QuorumFeatures.defaultFeatureMap(),
controllerNodes);
+
+ static class MockControllerMetrics extends QuorumControllerMetrics {
+ final AtomicBoolean closed = new AtomicBoolean(false);
+
+ MockControllerMetrics() {
+ super(Optional.empty(), Time.SYSTEM, false);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ closed.set(true);
+ }
+ }
+ MockControllerMetrics metrics = new MockControllerMetrics();
+
Time mockTime = new MockTime(1) {
public long nanoseconds() {
// We poll the event for each 1 sec, make it happen for each 10 ms to speed up the test
@@ -216,9 +235,9 @@ public class KRaftMigrationDriverTest {
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
+ metrics,
mockTime
)) {
-
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@@ -302,6 +321,7 @@ public class KRaftMigrationDriverTest {
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
+ metrics,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
@@ -348,9 +368,9 @@ public class KRaftMigrationDriverTest {
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
+ metrics,
mockTime
)) {
-
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@@ -387,15 +407,16 @@ public class KRaftMigrationDriverTest {
new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
- 3000,
- new NoOpRecordConsumer(),
- migrationClient,
- metadataPropagator,
- metadataPublisher -> { },
- faultHandler,
- quorumFeatures,
- KafkaConfigSchema.EMPTY,
- mockTime
+ 3000,
+ new NoOpRecordConsumer(),
+ migrationClient,
+ metadataPropagator,
+ metadataPublisher -> { },
+ faultHandler,
+ quorumFeatures,
+ KafkaConfigSchema.EMPTY,
+ metrics,
+ mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@@ -467,6 +488,7 @@ public class KRaftMigrationDriverTest {
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
+ metrics,
mockTime
)) {
verifier.verify(driver, migrationClient, topicClient, configClient);