You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/14 21:44:06 UTC

[kafka] branch trunk updated: MINOR Improve logging during the ZK to KRaft migration (#14008)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9253fed5c0 MINOR Improve logging during the ZK to KRaft migration (#14008)
d9253fed5c0 is described below

commit d9253fed5c022c53a43e986c7d738ca25ef7d92b
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri Jul 14 17:44:00 2023 -0400

    MINOR Improve logging during the ZK to KRaft migration (#14008)
    
    * Adds an exponential backoff to 1m while the controller is waiting for brokers to show up
    * Increases one-time logs to INFO
    * Adds a summary of the migration records
    * Use RecordRedactor for summary of migration batches (TRACE only)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../main/scala/kafka/server/ControllerServer.scala |   3 +-
 .../server/KafkaServerKRaftRegistrationTest.scala  |  23 +++--
 .../metadata/migration/KRaftMigrationDriver.java   | 106 ++++++++++---------
 .../metadata/migration/MigrationManifest.java      | 112 +++++++++++++++++++++
 .../migration/KRaftMigrationDriverTest.java        |   6 ++
 5 files changed, 194 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 36d8f5eca1e..c6ec0392ebe 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -267,7 +267,8 @@ class ControllerServer(
             fatal = false,
             () => {}
           ),
-          quorumFeatures
+          quorumFeatures,
+          configSchema
         )
         migrationDriver.start()
         migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index 367d81d8c78..dd9b57857a4 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
 import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
@@ -44,7 +44,12 @@ import scala.jdk.CollectionConverters._
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class KafkaServerKRaftRegistrationTest {
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  ))
   def testRegisterZkBrokerInKraft(zkCluster: ClusterInstance): Unit = {
     val clusterId = zkCluster.clusterId()
 
@@ -69,7 +74,7 @@ class KafkaServerKRaftRegistrationTest {
       zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
       zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
       zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-      zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
       zkCluster.rollingBrokerRestart()
       zkCluster.waitForReadyBrokers()
 
@@ -81,8 +86,7 @@ class KafkaServerKRaftRegistrationTest {
         case t: Throwable => fail("Had some other error waiting for brokers", t)
       }
     } finally {
-      kraftCluster.close()
-      zkCluster.stop()
+      shutdownInSequence(zkCluster, kraftCluster)
     }
   }
 
@@ -112,8 +116,13 @@ class KafkaServerKRaftRegistrationTest {
       zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
       assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart())
     } finally {
-      kraftCluster.close()
-      zkCluster.stop()
+      shutdownInSequence(zkCluster, kraftCluster)
     }
   }
+
+  def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: KafkaClusterTestKit): Unit = {
+    zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
+    kraftCluster.close()
+    zkCluster.stop()
+  }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 9cb788cc5db..b273a42406b 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -16,8 +16,7 @@
  */
 package org.apache.kafka.metadata.migration;
 
-import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.controller.QuorumFeatures;
@@ -28,17 +27,17 @@ import org.apache.kafka.image.loader.LoaderManifest;
 import org.apache.kafka.image.loader.LoaderManifestType;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.util.RecordRedactor;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.util.Deadline;
 import org.apache.kafka.server.util.FutureUtils;
 import org.slf4j.Logger;
 
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Map;
@@ -48,19 +47,38 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
  * This class orchestrates and manages the state related to a ZK to KRaft migration. A single event thread is used to
  * serialize events coming from various threads and listeners.
  */
 public class KRaftMigrationDriver implements MetadataPublisher {
+
+    private static class PollTimeSupplier {
+        private final ExponentialBackoff pollBackoff;
+        private long pollCount;
+
+        PollTimeSupplier() {
+            this.pollCount = 0;
+            this.pollBackoff = new ExponentialBackoff(100, 2, 60000, 0.02);
+        }
+
+        void reset() {
+            this.pollCount = 0;
+        }
+
+        public long nextPollTimeMs() {
+            long next = pollBackoff.backoff(pollCount);
+            pollCount++;
+            return next;
+        }
+    }
+
     private final static Consumer<Throwable> NO_OP_HANDLER = ex -> { };
 
     /**
@@ -78,8 +96,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private final LegacyPropagator propagator;
     private final ZkRecordConsumer zkRecordConsumer;
     private final KafkaEventQueue eventQueue;
+    private final PollTimeSupplier pollTimeSupplier;
     private final FaultHandler faultHandler;
     private final QuorumFeatures quorumFeatures;
+    private final RecordRedactor recordRedactor;
     /**
      * A callback for when the migration state has been recovered from ZK. This is used to delay the installation of this
      * MetadataPublisher with MetadataLoader.
@@ -98,6 +118,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         Consumer<MetadataPublisher> initialZkLoadHandler,
         FaultHandler faultHandler,
         QuorumFeatures quorumFeatures,
+        KafkaConfigSchema configSchema,
         Time time
     ) {
         this.nodeId = nodeId;
@@ -110,12 +131,14 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         this.migrationState = MigrationDriverState.UNINITIALIZED;
         this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
         this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
+        this.pollTimeSupplier = new PollTimeSupplier();
         this.image = MetadataImage.EMPTY;
         this.firstPublish = false;
         this.initialZkLoadHandler = initialZkLoadHandler;
         this.faultHandler = faultHandler;
         this.quorumFeatures = quorumFeatures;
         this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient);
+        this.recordRedactor = new RecordRedactor(configSchema);
     }
 
     public KRaftMigrationDriver(
@@ -125,9 +148,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         LegacyPropagator propagator,
         Consumer<MetadataPublisher> initialZkLoadHandler,
         FaultHandler faultHandler,
-        QuorumFeatures quorumFeatures
+        QuorumFeatures quorumFeatures,
+        KafkaConfigSchema configSchema
     ) {
-        this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, Time.SYSTEM);
+        this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, Time.SYSTEM);
     }
 
 
@@ -158,7 +182,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private boolean isControllerQuorumReadyForMigration() {
         Optional<String> notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady();
         if (notReadyMsg.isPresent()) {
-            log.info("Still waiting for all controller nodes ready to begin the migration. due to:" + notReadyMsg.get());
+            log.warn("Still waiting for all controller nodes ready to begin the migration. Not ready due to:" + notReadyMsg.get());
             return false;
         }
         return true;
@@ -287,6 +311,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
 
         if (newState != migrationState) {
             log.debug("{} transitioning from {} to {} state", nodeId, migrationState, newState);
+            pollTimeSupplier.reset();
+            wakeup();
         } else {
             log.trace("{} transitioning from {} to {} state", nodeId, migrationState, newState);
         }
@@ -294,6 +320,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         migrationState = newState;
     }
 
+    private void wakeup() {
+        eventQueue.append(new PollEvent());
+    }
+
     // MetadataPublisher methods
 
     @Override
@@ -440,6 +470,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                 log.trace("Received metadata {}, but the controller is not in dual-write " +
                         "mode. Ignoring the change to be replicated to Zookeeper", metadataType);
                 completionHandler.accept(null);
+                wakeup();
                 return;
             }
 
@@ -510,7 +541,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                     case PRE_MIGRATION:
                         if (isControllerQuorumReadyForMigration()) {
                             // Base case when starting the migration
-                            log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
+                            log.info("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
                             transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
                         }
                         break;
@@ -520,7 +551,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                             transitionTo(MigrationDriverState.INACTIVE);
                         } else {
                             // Base case when rebooting a controller during migration
-                            log.debug("Migration is in already progress, not waiting on ZK brokers.");
+                            log.info("Migration is in already progress, not waiting on ZK brokers.");
                             transitionTo(MigrationDriverState.BECOME_CONTROLLER);
                         }
                         break;
@@ -539,7 +570,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             switch (migrationState) {
                 case WAIT_FOR_BROKERS:
                     if (areZkBrokersReadyForMigration()) {
-                        log.debug("Zk brokers are registered and ready for migration");
+                        log.info("Zk brokers are registered and ready for migration");
                         transitionTo(MigrationDriverState.BECOME_CONTROLLER);
                     }
                     break;
@@ -556,7 +587,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             if (migrationState == MigrationDriverState.BECOME_CONTROLLER) {
                 applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership);
                 if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
-                    log.debug("Unable to claim leadership, will retry until we learn of a different KRaft leader");
+                    log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader");
                 } else {
                     if (!migrationLeadershipState.initialZkMigrationComplete()) {
                         transitionTo(MigrationDriverState.ZK_MIGRATION);
@@ -573,22 +604,23 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         public void run() throws Exception {
             Set<Integer> brokersInMetadata = new HashSet<>();
             log.info("Starting ZK migration");
+            MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
             zkRecordConsumer.beginMigration();
             try {
-                AtomicInteger count = new AtomicInteger(0);
                 zkMigrationClient.readAllMetadata(batch -> {
                     try {
+                        log.info("Migrating {} records from ZK", batch.size());
                         if (log.isTraceEnabled()) {
-                            log.trace("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch));
-                        } else {
-                            log.info("Migrating {} records from ZK", batch.size());
+                            batch.forEach(apiMessageAndVersion ->
+                                log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
                         }
                         CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
                         FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
                             "the metadata layer to commit migration record batch",
                             future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
-                        count.addAndGet(batch.size());
+                        manifestBuilder.acceptBatch(batch);
                     } catch (Throwable e) {
+                        // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
                         throw new RuntimeException(e);
                     }
                 }, brokersInMetadata::add);
@@ -597,10 +629,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                     KRaftMigrationDriver.this.log, "",
                     "the metadata layer to complete the migration",
                     completeMigrationFuture, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
-                log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were " +
-                         "generated. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the " +
+                MigrationManifest manifest = manifestBuilder.build();
+                log.info("Completed migration of metadata from ZooKeeper to KRaft. {}. " +
+                         "The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the " +
                          "migrated metadata {}.",
-                    count.get(),
+                    manifest,
                     offsetAndEpochAfterMigration.offset(),
                     offsetAndEpochAfterMigration.epoch(),
                     brokersInMetadata.size(),
@@ -615,6 +648,8 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                 transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
             } catch (Throwable t) {
                 zkRecordConsumer.abortMigration();
+                MigrationManifest partialManifest = manifestBuilder.build();
+                log.error("Aborted metadata migration from ZooKeeper to KRaft. {}.", partialManifest);
                 super.handleException(t);
             }
         }
@@ -690,7 +725,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             }
 
             // Poll again after some time
-            long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(pollTimeSupplier.nextPollTimeMs(), MILLISECONDS);
             eventQueue.scheduleDeferred(
                     "poll",
                     new EventQueue.DeadlineFunction(deadline),
@@ -698,31 +733,6 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
     }
 
-    static String recordBatchToString(Collection<ApiMessageAndVersion> batch) {
-        String batchString = batch.stream().map(apiMessageAndVersion -> {
-            if (apiMessageAndVersion.message().apiKey() == MetadataRecordType.CONFIG_RECORD.id()) {
-                StringBuilder sb = new StringBuilder();
-                sb.append("ApiMessageAndVersion(");
-                ConfigRecord record = (ConfigRecord) apiMessageAndVersion.message();
-                sb.append("ConfigRecord(");
-                sb.append("resourceType=");
-                sb.append(record.resourceType());
-                sb.append(", resourceName=");
-                sb.append(record.resourceName());
-                sb.append(", name=");
-                sb.append(record.name());
-                sb.append(")");
-                sb.append(" at version ");
-                sb.append(apiMessageAndVersion.version());
-                sb.append(")");
-                return sb.toString();
-            } else {
-                return apiMessageAndVersion.toString();
-            }
-        }).collect(Collectors.joining(","));
-        return "[" + batchString + "]";
-    }
-
     static KRaftMigrationOperationConsumer countingOperationConsumer(
         Map<String, Integer> dualWriteCounts,
         BiConsumer<String, KRaftMigrationOperation> operationConsumer
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
new file mode 100644
index 00000000000..7d174b15e68
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Holds the results of a ZK to KRaft metadata migration. The {@link #toString()} can be used to provide a
+ * human-readable summary of the migration.
+ */
+public class MigrationManifest {
+
+    public static class Builder {
+        private final Time time;
+        private final long startTimeNanos;
+        private final Map<MetadataRecordType, Integer> counts = new HashMap<>();
+        private int batches = 0;
+        private int total = 0;
+        private long endTimeNanos = 0;
+
+        Builder(Time time) {
+            this.time = time;
+            this.startTimeNanos = time.nanoseconds();
+        }
+
+        public void acceptBatch(List<ApiMessageAndVersion> recordBatch) {
+            batches++;
+            recordBatch.forEach(apiMessageAndVersion -> {
+                MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
+                counts.merge(type, 1, (__, count) -> count + 1);
+                total++;
+            });
+        }
+
+        public MigrationManifest build() {
+            if (endTimeNanos == 0) {
+                endTimeNanos = time.nanoseconds();
+            }
+            return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts);
+        }
+    }
+
+    private final int totalRecords;
+    private final int totalBatches;
+    private final long durationNanos;
+    private final Map<MetadataRecordType, Integer> recordTypeCounts;
+
+    MigrationManifest(
+        int totalRecords,
+        int totalBatches,
+        long durationNanos,
+        Map<MetadataRecordType, Integer> recordTypeCounts
+    ) {
+        this.totalRecords = totalRecords;
+        this.totalBatches = totalBatches;
+        this.durationNanos = durationNanos;
+        this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
+    }
+
+    public static Builder newBuilder(Time time) {
+        return new Builder(time);
+    }
+
+    public long durationMs() {
+        return TimeUnit.NANOSECONDS.toMillis(durationNanos);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        MigrationManifest that = (MigrationManifest) o;
+        return totalRecords == that.totalRecords &&
+            totalBatches == that.totalBatches &&
+            durationNanos == that.durationNanos &&
+            recordTypeCounts.equals(that.recordTypeCounts);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts);
+    }
+
+    public String toString() {
+        return String.format("%d records were generated in %d ms across %d batches. The record types were %s",
+            totalRecords, durationMs(), totalBatches, recordTypeCounts);
+    }
+}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index 3b13e065668..e530cc63aac 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.image.loader.LogDeltaManifest;
 import org.apache.kafka.image.loader.SnapshotManifest;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
+import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
@@ -214,6 +215,7 @@ public class KRaftMigrationDriverTest {
             metadataPublisher -> { },
             new MockFaultHandler("test"),
             quorumFeatures,
+            KafkaConfigSchema.EMPTY,
             mockTime
         )) {
 
@@ -299,6 +301,7 @@ public class KRaftMigrationDriverTest {
             metadataPublisher -> { },
             faultHandler,
             quorumFeatures,
+            KafkaConfigSchema.EMPTY,
             mockTime
         )) {
             MetadataImage image = MetadataImage.EMPTY;
@@ -344,6 +347,7 @@ public class KRaftMigrationDriverTest {
             },
             new MockFaultHandler("test"),
             quorumFeatures,
+            KafkaConfigSchema.EMPTY,
             mockTime
         )) {
 
@@ -390,6 +394,7 @@ public class KRaftMigrationDriverTest {
                 metadataPublisher -> { },
                 faultHandler,
                 quorumFeatures,
+                KafkaConfigSchema.EMPTY,
                 mockTime
         )) {
             MetadataImage image = MetadataImage.EMPTY;
@@ -461,6 +466,7 @@ public class KRaftMigrationDriverTest {
             metadataPublisher -> { },
             new MockFaultHandler("test"),
             quorumFeatures,
+            KafkaConfigSchema.EMPTY,
             mockTime
         )) {
             verifier.verify(driver, migrationClient, topicClient, configClient);