You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/14 21:44:06 UTC
[kafka] branch trunk updated: MINOR Improve logging during the ZK to KRaft migration (#14008)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d9253fed5c0 MINOR Improve logging during the ZK to KRaft migration (#14008)
d9253fed5c0 is described below
commit d9253fed5c022c53a43e986c7d738ca25ef7d92b
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri Jul 14 17:44:00 2023 -0400
MINOR Improve logging during the ZK to KRaft migration (#14008)
* Adds an exponential backoff to 1m while the controller is waiting for brokers to show up
* Increases one-time logs to INFO
* Adds a summary of the migration records
* Use RecordRedactor for summary of migration batches (TRACE only)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../main/scala/kafka/server/ControllerServer.scala | 3 +-
.../server/KafkaServerKRaftRegistrationTest.scala | 23 +++--
.../metadata/migration/KRaftMigrationDriver.java | 106 ++++++++++---------
.../metadata/migration/MigrationManifest.java | 112 +++++++++++++++++++++
.../migration/KRaftMigrationDriverTest.java | 6 ++
5 files changed, 194 insertions(+), 56 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 36d8f5eca1e..c6ec0392ebe 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -267,7 +267,8 @@ class ControllerServer(
fatal = false,
() => {}
),
- quorumFeatures
+ quorumFeatures,
+ configSchema
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index 367d81d8c78..dd9b57857a4 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
@@ -44,7 +44,12 @@ import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class KafkaServerKRaftRegistrationTest {
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+ @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+ new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
+ new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ ))
def testRegisterZkBrokerInKraft(zkCluster: ClusterInstance): Unit = {
val clusterId = zkCluster.clusterId()
@@ -69,7 +74,7 @@ class KafkaServerKRaftRegistrationTest {
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
- zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
zkCluster.waitForReadyBrokers()
@@ -81,8 +86,7 @@ class KafkaServerKRaftRegistrationTest {
case t: Throwable => fail("Had some other error waiting for brokers", t)
}
} finally {
- kraftCluster.close()
- zkCluster.stop()
+ shutdownInSequence(zkCluster, kraftCluster)
}
}
@@ -112,8 +116,13 @@ class KafkaServerKRaftRegistrationTest {
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart())
} finally {
- kraftCluster.close()
- zkCluster.stop()
+ shutdownInSequence(zkCluster, kraftCluster)
}
}
+
+ def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: KafkaClusterTestKit): Unit = {
+ zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
+ kraftCluster.close()
+ zkCluster.stop()
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 9cb788cc5db..b273a42406b 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -16,8 +16,7 @@
*/
package org.apache.kafka.metadata.migration;
-import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
@@ -28,17 +27,17 @@ import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LoaderManifestType;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.util.RecordRedactor;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils;
import org.slf4j.Logger;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
@@ -48,19 +47,38 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
/**
* This class orchestrates and manages the state related to a ZK to KRaft migration. A single event thread is used to
* serialize events coming from various threads and listeners.
*/
public class KRaftMigrationDriver implements MetadataPublisher {
+
+ private static class PollTimeSupplier {
+ private final ExponentialBackoff pollBackoff;
+ private long pollCount;
+
+ PollTimeSupplier() {
+ this.pollCount = 0;
+ this.pollBackoff = new ExponentialBackoff(100, 2, 60000, 0.02);
+ }
+
+ void reset() {
+ this.pollCount = 0;
+ }
+
+ public long nextPollTimeMs() {
+ long next = pollBackoff.backoff(pollCount);
+ pollCount++;
+ return next;
+ }
+ }
+
private final static Consumer<Throwable> NO_OP_HANDLER = ex -> { };
/**
@@ -78,8 +96,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private final LegacyPropagator propagator;
private final ZkRecordConsumer zkRecordConsumer;
private final KafkaEventQueue eventQueue;
+ private final PollTimeSupplier pollTimeSupplier;
private final FaultHandler faultHandler;
private final QuorumFeatures quorumFeatures;
+ private final RecordRedactor recordRedactor;
/**
* A callback for when the migration state has been recovered from ZK. This is used to delay the installation of this
* MetadataPublisher with MetadataLoader.
@@ -98,6 +118,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
+ KafkaConfigSchema configSchema,
Time time
) {
this.nodeId = nodeId;
@@ -110,12 +131,14 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
+ this.pollTimeSupplier = new PollTimeSupplier();
this.image = MetadataImage.EMPTY;
this.firstPublish = false;
this.initialZkLoadHandler = initialZkLoadHandler;
this.faultHandler = faultHandler;
this.quorumFeatures = quorumFeatures;
this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient);
+ this.recordRedactor = new RecordRedactor(configSchema);
}
public KRaftMigrationDriver(
@@ -125,9 +148,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
LegacyPropagator propagator,
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
- QuorumFeatures quorumFeatures
+ QuorumFeatures quorumFeatures,
+ KafkaConfigSchema configSchema
) {
- this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, Time.SYSTEM);
+ this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, Time.SYSTEM);
}
@@ -158,7 +182,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private boolean isControllerQuorumReadyForMigration() {
Optional<String> notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady();
if (notReadyMsg.isPresent()) {
- log.info("Still waiting for all controller nodes ready to begin the migration. due to:" + notReadyMsg.get());
+ log.warn("Still waiting for all controller nodes ready to begin the migration. Not ready due to:" + notReadyMsg.get());
return false;
}
return true;
@@ -287,6 +311,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
if (newState != migrationState) {
log.debug("{} transitioning from {} to {} state", nodeId, migrationState, newState);
+ pollTimeSupplier.reset();
+ wakeup();
} else {
log.trace("{} transitioning from {} to {} state", nodeId, migrationState, newState);
}
@@ -294,6 +320,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
migrationState = newState;
}
+ private void wakeup() {
+ eventQueue.append(new PollEvent());
+ }
+
// MetadataPublisher methods
@Override
@@ -440,6 +470,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
log.trace("Received metadata {}, but the controller is not in dual-write " +
"mode. Ignoring the change to be replicated to Zookeeper", metadataType);
completionHandler.accept(null);
+ wakeup();
return;
}
@@ -510,7 +541,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
case PRE_MIGRATION:
if (isControllerQuorumReadyForMigration()) {
// Base case when starting the migration
- log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
+ log.info("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
}
break;
@@ -520,7 +551,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
transitionTo(MigrationDriverState.INACTIVE);
} else {
// Base case when rebooting a controller during migration
- log.debug("Migration is in already progress, not waiting on ZK brokers.");
+ log.info("Migration is in already progress, not waiting on ZK brokers.");
transitionTo(MigrationDriverState.BECOME_CONTROLLER);
}
break;
@@ -539,7 +570,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
switch (migrationState) {
case WAIT_FOR_BROKERS:
if (areZkBrokersReadyForMigration()) {
- log.debug("Zk brokers are registered and ready for migration");
+ log.info("Zk brokers are registered and ready for migration");
transitionTo(MigrationDriverState.BECOME_CONTROLLER);
}
break;
@@ -556,7 +587,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
if (migrationState == MigrationDriverState.BECOME_CONTROLLER) {
applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership);
if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
- log.debug("Unable to claim leadership, will retry until we learn of a different KRaft leader");
+ log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader");
} else {
if (!migrationLeadershipState.initialZkMigrationComplete()) {
transitionTo(MigrationDriverState.ZK_MIGRATION);
@@ -573,22 +604,23 @@ public class KRaftMigrationDriver implements MetadataPublisher {
public void run() throws Exception {
Set<Integer> brokersInMetadata = new HashSet<>();
log.info("Starting ZK migration");
+ MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
zkRecordConsumer.beginMigration();
try {
- AtomicInteger count = new AtomicInteger(0);
zkMigrationClient.readAllMetadata(batch -> {
try {
+ log.info("Migrating {} records from ZK", batch.size());
if (log.isTraceEnabled()) {
- log.trace("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch));
- } else {
- log.info("Migrating {} records from ZK", batch.size());
+ batch.forEach(apiMessageAndVersion ->
+ log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
"the metadata layer to commit migration record batch",
future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
- count.addAndGet(batch.size());
+ manifestBuilder.acceptBatch(batch);
} catch (Throwable e) {
+ // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
throw new RuntimeException(e);
}
}, brokersInMetadata::add);
@@ -597,10 +629,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
KRaftMigrationDriver.this.log, "",
"the metadata layer to complete the migration",
completeMigrationFuture, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
- log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were " +
- "generated. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the " +
+ MigrationManifest manifest = manifestBuilder.build();
+ log.info("Completed migration of metadata from ZooKeeper to KRaft. {}. " +
+ "The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the " +
"migrated metadata {}.",
- count.get(),
+ manifest,
offsetAndEpochAfterMigration.offset(),
offsetAndEpochAfterMigration.epoch(),
brokersInMetadata.size(),
@@ -615,6 +648,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
} catch (Throwable t) {
zkRecordConsumer.abortMigration();
+ MigrationManifest partialManifest = manifestBuilder.build();
+ log.error("Aborted metadata migration from ZooKeeper to KRaft. {}.", partialManifest);
super.handleException(t);
}
}
@@ -690,7 +725,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
// Poll again after some time
- long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
+ long deadline = time.nanoseconds() + NANOSECONDS.convert(pollTimeSupplier.nextPollTimeMs(), MILLISECONDS);
eventQueue.scheduleDeferred(
"poll",
new EventQueue.DeadlineFunction(deadline),
@@ -698,31 +733,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
}
}
- static String recordBatchToString(Collection<ApiMessageAndVersion> batch) {
- String batchString = batch.stream().map(apiMessageAndVersion -> {
- if (apiMessageAndVersion.message().apiKey() == MetadataRecordType.CONFIG_RECORD.id()) {
- StringBuilder sb = new StringBuilder();
- sb.append("ApiMessageAndVersion(");
- ConfigRecord record = (ConfigRecord) apiMessageAndVersion.message();
- sb.append("ConfigRecord(");
- sb.append("resourceType=");
- sb.append(record.resourceType());
- sb.append(", resourceName=");
- sb.append(record.resourceName());
- sb.append(", name=");
- sb.append(record.name());
- sb.append(")");
- sb.append(" at version ");
- sb.append(apiMessageAndVersion.version());
- sb.append(")");
- return sb.toString();
- } else {
- return apiMessageAndVersion.toString();
- }
- }).collect(Collectors.joining(","));
- return "[" + batchString + "]";
- }
-
static KRaftMigrationOperationConsumer countingOperationConsumer(
Map<String, Integer> dualWriteCounts,
BiConsumer<String, KRaftMigrationOperation> operationConsumer
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
new file mode 100644
index 00000000000..7d174b15e68
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Holds the results of a ZK to KRaft metadata migration. The {@link #toString()} can be used to provide a
+ * human-readable summary of the migration.
+ */
+public class MigrationManifest {
+
+ public static class Builder {
+ private final Time time;
+ private final long startTimeNanos;
+ private final Map<MetadataRecordType, Integer> counts = new HashMap<>();
+ private int batches = 0;
+ private int total = 0;
+ private long endTimeNanos = 0;
+
+ Builder(Time time) {
+ this.time = time;
+ this.startTimeNanos = time.nanoseconds();
+ }
+
+ public void acceptBatch(List<ApiMessageAndVersion> recordBatch) {
+ batches++;
+ recordBatch.forEach(apiMessageAndVersion -> {
+ MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
+ counts.merge(type, 1, (__, count) -> count + 1);
+ total++;
+ });
+ }
+
+ public MigrationManifest build() {
+ if (endTimeNanos == 0) {
+ endTimeNanos = time.nanoseconds();
+ }
+ return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts);
+ }
+ }
+
+ private final int totalRecords;
+ private final int totalBatches;
+ private final long durationNanos;
+ private final Map<MetadataRecordType, Integer> recordTypeCounts;
+
+ MigrationManifest(
+ int totalRecords,
+ int totalBatches,
+ long durationNanos,
+ Map<MetadataRecordType, Integer> recordTypeCounts
+ ) {
+ this.totalRecords = totalRecords;
+ this.totalBatches = totalBatches;
+ this.durationNanos = durationNanos;
+ this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
+ }
+
+ public static Builder newBuilder(Time time) {
+ return new Builder(time);
+ }
+
+ public long durationMs() {
+ return TimeUnit.NANOSECONDS.toMillis(durationNanos);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ MigrationManifest that = (MigrationManifest) o;
+ return totalRecords == that.totalRecords &&
+ totalBatches == that.totalBatches &&
+ durationNanos == that.durationNanos &&
+ recordTypeCounts.equals(that.recordTypeCounts);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts);
+ }
+
+ public String toString() {
+ return String.format("%d records were generated in %d ms across %d batches. The record types were %s",
+ totalRecords, durationMs(), totalBatches, recordTypeCounts);
+ }
+}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index 3b13e065668..e530cc63aac 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
+import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
@@ -214,6 +215,7 @@ public class KRaftMigrationDriverTest {
metadataPublisher -> { },
new MockFaultHandler("test"),
quorumFeatures,
+ KafkaConfigSchema.EMPTY,
mockTime
)) {
@@ -299,6 +301,7 @@ public class KRaftMigrationDriverTest {
metadataPublisher -> { },
faultHandler,
quorumFeatures,
+ KafkaConfigSchema.EMPTY,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
@@ -344,6 +347,7 @@ public class KRaftMigrationDriverTest {
},
new MockFaultHandler("test"),
quorumFeatures,
+ KafkaConfigSchema.EMPTY,
mockTime
)) {
@@ -390,6 +394,7 @@ public class KRaftMigrationDriverTest {
metadataPublisher -> { },
faultHandler,
quorumFeatures,
+ KafkaConfigSchema.EMPTY,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
@@ -461,6 +466,7 @@ public class KRaftMigrationDriverTest {
metadataPublisher -> { },
new MockFaultHandler("test"),
quorumFeatures,
+ KafkaConfigSchema.EMPTY,
mockTime
)) {
verifier.verify(driver, migrationClient, topicClient, configClient);