You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/04 19:06:19 UTC

[kafka] branch trunk updated: MINOR: improve QuorumController logging #13540

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 63f9f23ec0a MINOR: improve QuorumController logging #13540
63f9f23ec0a is described below

commit 63f9f23ec0aaa62f0da93ebc42934f5fce743ddb
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Thu May 4 11:18:03 2023 -0700

    MINOR: improve QuorumController logging #13540
    
    When creating the QuorumController, log whether ZK migration is enabled.
    
    When applying a feature level record which sets the metadata version, log the metadata version enum
    rather than the numeric feature level.
    
    Improve the logging when we replay snapshots in QuorumController. Log both the beginning and the
    end of replay.
    
    When TRACE is enabled, log every record that is replayed in QuorumController. Since some records
    may contain sensitive information, create RecordRedactor to assist in logging only what is safe to
    put in the log4j file.
    
    Add logging to ControllerPurgatory. Successful completions are logged at DEBUG; failures are logged
    at INFO, and additions are logged at TRACE.
    
    Remove SnapshotReason.java, SnapshotReasonTest.java, and
    QuorumController#generateSnapshotScheduled. They are deadcode now that snapshot generation moved to
    org.apache.kafka.image.publisher.SnapshotGenerator.
    
    Reviewers: David Arthur <mu...@gmail.com>, José Armando García Sancio <js...@apache.org>
---
 .../kafka/controller/FeatureControlManager.java    |   5 +-
 .../apache/kafka/controller/QuorumController.java  |  62 ++++++------
 .../apache/kafka/metadata/util/RecordRedactor.java |  66 +++++++++++++
 .../apache/kafka/metadata/util/SnapshotReason.java |  60 ------------
 .../kafka/metadata/util/RecordRedactorTest.java    | 105 +++++++++++++++++++++
 .../kafka/metadata/util/SnapshotReasonTest.java    |  80 ----------------
 .../apache/kafka/deferred/DeferredEventQueue.java  |  20 ++++
 .../kafka/deferred/DeferredEventQueueTest.java     |   7 +-
 8 files changed, 228 insertions(+), 177 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 319c9d537ee..b1975183916 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -324,8 +324,9 @@ public class FeatureControlManager {
                 "supports versions " + range);
         }
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
-            log.info("Setting metadata.version to {}", record.featureLevel());
-            metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+            MetadataVersion mv = MetadataVersion.fromFeatureLevel(record.featureLevel());
+            log.info("Setting metadata version to {}", mv);
+            metadataVersion.set(mv);
         } else {
             if (record.featureLevel() == 0) {
                 log.info("Removing feature {}", record.name());
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 16393983543..66fc2611a3d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -88,6 +88,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.metadata.migration.ZkRecordConsumer;
 import org.apache.kafka.metadata.placement.ReplicaPlacer;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
+import org.apache.kafka.metadata.util.RecordRedactor;
 import org.apache.kafka.deferred.DeferredEventQueue;
 import org.apache.kafka.deferred.DeferredEvent;
 import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
@@ -106,6 +107,7 @@ import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.Snapshots;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.slf4j.Logger;
 
@@ -130,7 +132,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -963,15 +964,8 @@ public final class QuorumController implements Controller {
                             // If the controller is a standby, replay the records that were
                             // created by the active controller.
                             if (log.isDebugEnabled()) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("Replaying commits from the active node up to " +
-                                        "offset {} and epoch {}: {}.", offset, epoch, messages.stream()
-                                        .map(ApiMessageAndVersion::toString)
-                                        .collect(Collectors.joining(", ")));
-                                } else {
-                                    log.debug("Replaying commits from the active node up to " +
-                                        "offset {} and epoch {}.", offset, epoch);
-                                }
+                                log.debug("Replaying commits from the active node up to " +
+                                    "offset {} and epoch {}.", offset, epoch);
                             }
                             int i = 1;
                             for (ApiMessageAndVersion message : messages) {
@@ -1011,13 +1005,13 @@ public final class QuorumController implements Controller {
         public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                 try {
+                    String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
                     if (isActiveController()) {
-                        throw fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
-                            "(%s) when it is the active controller (%d)", reader.snapshotId(),
-                            curClaimEpoch));
+                        throw fatalFaultHandler.handleFault("Asked to load snapshot " + snapshotName +
+                            ", but we are the active controller at epoch " + curClaimEpoch);
                     }
-                    log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})",
-                        reader.snapshotId(), lastCommittedOffset, lastCommittedEpoch);
+                    log.info("Starting to replay snapshot {}, from last commit offset {} and epoch {}",
+                        snapshotName, lastCommittedOffset, lastCommittedEpoch);
 
                     resetToEmptyState();
 
@@ -1026,16 +1020,8 @@ public final class QuorumController implements Controller {
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
 
-                        if (log.isDebugEnabled()) {
-                            if (log.isTraceEnabled()) {
-                                log.trace("Replaying snapshot ({}) batch with last offset of {}: {}",
-                                    reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString).
-                                        collect(Collectors.joining(", ")));
-                            } else {
-                                log.debug("Replaying snapshot ({}) batch with last offset of {}",
-                                    reader.snapshotId(), offset);
-                            }
-                        }
+                        log.debug("Replaying snapshot {} batch with last offset of {}",
+                            snapshotName, offset);
 
                         int i = 1;
                         for (ApiMessageAndVersion message : messages) {
@@ -1052,6 +1038,7 @@ public final class QuorumController implements Controller {
                             i++;
                         }
                     }
+                    log.info("Finished replaying snapshot {}", snapshotName);
 
                     updateLastCommittedState(
                         reader.lastContainedLogOffset(),
@@ -1509,6 +1496,16 @@ public final class QuorumController implements Controller {
      *                          if this record is from a snapshot, this is used along with RegisterBrokerRecord
      */
     private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
+        if (log.isTraceEnabled()) {
+            if (snapshotId.isPresent()) {
+                log.trace("Replaying snapshot {} record {}",
+                    Snapshots.filenameFromSnapshotId(snapshotId.get()),
+                        recordRedactor.toLoggableString(message));
+            } else {
+                log.trace("Replaying log record {} with batchLastOffset {}",
+                        recordRedactor.toLoggableString(message), batchLastOffset);
+            }
+        }
         logReplayTracker.replay(message);
         MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
         switch (type) {
@@ -1780,11 +1777,6 @@ public final class QuorumController implements Controller {
      */
     private boolean noOpRecordScheduled = false;
 
-    /**
-     * Tracks if a snapshot generate was scheduled.
-     */
-    private boolean generateSnapshotScheduled = false;
-
     /**
      * The bootstrap metadata to use for initialization if needed.
      */
@@ -1799,6 +1791,10 @@ public final class QuorumController implements Controller {
      */
     private final int maxRecordsPerBatch;
 
+    /**
+     * Supports converting records to strings without disclosing passwords.
+     */
+    private final RecordRedactor recordRedactor;
 
     private QuorumController(
         FaultHandler fatalFaultHandler,
@@ -1835,7 +1831,7 @@ public final class QuorumController implements Controller {
         this.time = time;
         this.controllerMetrics = controllerMetrics;
         this.snapshotRegistry = new SnapshotRegistry(logContext);
-        this.deferredEventQueue = new DeferredEventQueue();
+        this.deferredEventQueue = new DeferredEventQueue(logContext);
         this.resourceExists = new ConfigResourceExistenceChecker();
         this.configurationControl = new ConfigurationControlManager.Builder().
             setLogContext(logContext).
@@ -1901,11 +1897,13 @@ public final class QuorumController implements Controller {
         this.needToCompleteAuthorizerLoad = authorizer.isPresent();
         this.zkRecordConsumer = new MigrationRecordConsumer();
         this.zkMigrationEnabled = zkMigrationEnabled;
+        this.recordRedactor = new RecordRedactor(configSchema);
         updateWriteOffset(-1);
 
         resetToEmptyState();
 
-        log.info("Creating new QuorumController with clusterId {}, authorizer {}.", clusterId, authorizer);
+        log.info("Creating new QuorumController with clusterId {}, authorizer {}.{}",
+                clusterId, authorizer, zkMigrationEnabled ? " ZK migration mode is enabled." : "");
 
         this.raftClient.register(metaLogListener);
     }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java b/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
new file mode 100644
index 00000000000..5641a0cc0ff
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.util;
+
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+
+import java.util.Arrays;
+
+
+/**
+ * Converts a metadata record to a string suitable for logging to slf4j.
+ * This means that passwords and key material are omitted from the output.
+ */
+public final class RecordRedactor {
+    private final KafkaConfigSchema configSchema;
+
+    public RecordRedactor(KafkaConfigSchema configSchema) {
+        this.configSchema = configSchema;
+    }
+
+    public String toLoggableString(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case CONFIG_RECORD: {
+                if (!configSchema.isSensitive((ConfigRecord) message)) {
+                    return message.toString();
+                }
+                ConfigRecord duplicate = ((ConfigRecord) message).duplicate();
+                duplicate.setValue("(redacted)");
+                return duplicate.toString();
+            }
+            case USER_SCRAM_CREDENTIAL_RECORD: {
+                UserScramCredentialRecord record = (UserScramCredentialRecord) message;
+                return "UserScramCredentialRecord("
+                        + "name=" + ((record.name() == null) ? "null" : "'" + record.name().toString() + "'")
+                        + ", mechanism=" + record.mechanism()
+                        + ", salt=(redacted)"
+                        + ", storedKey=(redacted)"
+                        + ", serverKey=(redacted)"
+                        + ", iterations=" + record.iterations()
+                        + ")";
+            }
+            default:
+                return message.toString();
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java
index a65bdd5593b..e69de29bb2d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.metadata.util;
-
-import java.util.Collection;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.MetadataVersion;
-
-/**
- * Reason for generating a snapshot.
- */
-public final class SnapshotReason {
-    static public final SnapshotReason UNKNOWN = new SnapshotReason("unknown reason");
-
-    static public SnapshotReason maxBytesExceeded(long bytes, long maxBytes) {
-        return new SnapshotReason(String.format("%s bytes exceeded the maximum bytes of %s", bytes, maxBytes));
-    }
-
-    static public SnapshotReason maxIntervalExceeded(long interval, long maxInterval) {
-        return new SnapshotReason(
-            String.format("%s ms exceeded the maximum snapshot interval of %s ms", interval, maxInterval)
-        );
-    }
-
-    static public SnapshotReason metadataVersionChanged(MetadataVersion metadataVersion) {
-        return new SnapshotReason(String.format("metadata version was changed to %s", metadataVersion));
-    }
-
-    /**
-     * Converts a collection of reasons into a string.
-     */
-    static public String stringFromReasons(Collection<SnapshotReason> reasons) {
-        return Utils.join(reasons, ", ");
-    }
-
-    private final String reason;
-
-    private SnapshotReason(String reason) {
-        this.reason = reason;
-    }
-
-    @Override
-    public String toString() {
-        return reason;
-    }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
new file mode 100644
index 00000000000..631d440ddf3
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/util/RecordRedactorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+final public class RecordRedactorTest {
+    public static final Map<ConfigResource.Type, ConfigDef> CONFIGS = new HashMap<>();
+
+    static {
+        CONFIGS.put(BROKER, new ConfigDef().
+                define("foobar", ConfigDef.Type.LIST, "1", ConfigDef.Importance.HIGH, "foo bar doc").
+                define("quux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc"));
+    }
+
+    private static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, Collections.emptyMap());
+
+    private static final RecordRedactor REDACTOR = new RecordRedactor(SCHEMA);
+
+    @Test
+    public void testTopicRecordToString() {
+        assertEquals("TopicRecord(name='foo', topicId=UOovKkohSU6AGdYW33ZUNg)",
+                REDACTOR.toLoggableString(new TopicRecord().
+                    setTopicId(Uuid.fromString("UOovKkohSU6AGdYW33ZUNg")).
+                    setName("foo")));
+    }
+
+    @Test
+    public void testUserScramCredentialRecordToString() {
+        assertEquals("UserScramCredentialRecord(name='bob', mechanism=0, " +
+            "salt=(redacted), storedKey=(redacted), serverKey=(redacted), iterations=128)",
+                REDACTOR.toLoggableString(new UserScramCredentialRecord().
+                    setName("bob").
+                    setMechanism((byte) 0).
+                    setSalt(new byte[512]).
+                    setServerKey(new byte[128]).
+                    setStoredKey(new byte[128]).
+                    setIterations(128)));
+    }
+
+    @Test
+    public void testUserScramCredentialRecordToStringWithNullName() {
+        assertEquals("UserScramCredentialRecord(name=null, mechanism=1, " +
+                        "salt=(redacted), storedKey=(redacted), serverKey=(redacted), iterations=256)",
+                REDACTOR.toLoggableString(new UserScramCredentialRecord().
+                        setName(null).
+                        setMechanism((byte) 1).
+                        setSalt(new byte[512]).
+                        setServerKey(new byte[128]).
+                        setStoredKey(new byte[128]).
+                        setIterations(256)));
+    }
+
+    @Test
+    public void testSensitiveConfigRecordToString() {
+        assertEquals("ConfigRecord(resourceType=4, resourceName='0', name='quux', " +
+            "value='(redacted)')",
+                REDACTOR.toLoggableString(new ConfigRecord().
+                    setResourceType(BROKER.id()).
+                    setResourceName("0").
+                    setName("quux").
+                    setValue("mysecret")));
+    }
+
+    @Test
+    public void testNonSensitiveConfigRecordToString() {
+        assertEquals("ConfigRecord(resourceType=4, resourceName='0', name='foobar', " +
+            "value='item1,item2')",
+                REDACTOR.toLoggableString(new ConfigRecord().
+                    setResourceType(BROKER.id()).
+                    setResourceName("0").
+                    setName("foobar").
+                    setValue("item1,item2")));
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/util/SnapshotReasonTest.java b/metadata/src/test/java/org/apache/kafka/metadata/util/SnapshotReasonTest.java
index e90a4d2928d..e69de29bb2d 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/util/SnapshotReasonTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/util/SnapshotReasonTest.java
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.metadata.util;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public final class SnapshotReasonTest {
-    @Test
-    public void testUnknown() {
-        assertEquals("unknown reason", SnapshotReason.UNKNOWN.toString());
-    }
-
-    @Test
-    public void testMaxBytesExceeded() {
-        long bytes = 1000;
-        long maxBytes = 900;
-        String expectedMessage = "1000 bytes exceeded the maximum bytes of 900";
-
-        assertEquals(expectedMessage, SnapshotReason.maxBytesExceeded(bytes, maxBytes).toString());
-    }
-
-    @Test
-    public void testMaxIntervalExceeded() {
-        long interval = 1000;
-        long maxInterval = 900;
-        String expectedMessage = "1000 ms exceeded the maximum snapshot interval of 900 ms";
-
-        assertEquals(expectedMessage, SnapshotReason.maxIntervalExceeded(interval, maxInterval).toString());
-    }
-
-    @Test
-    public void testMetadataVersionChanged() {
-        MetadataVersion metadataVersion = MetadataVersion.IBP_3_3_IV3;
-        String expectedMessage = "metadata version was changed to 3.3-IV3";
-
-        assertEquals(expectedMessage, SnapshotReason.metadataVersionChanged(metadataVersion).toString());
-    }
-
-    @Test
-    public void testJoinReasons() {
-        long bytes = 1000;
-        long maxBytes = 900;
-        long interval = 1000;
-        long maxInterval = 900;
-        MetadataVersion metadataVersion = MetadataVersion.IBP_3_3_IV3;
-
-        List<SnapshotReason> reasons = Arrays.asList(
-            SnapshotReason.UNKNOWN,
-            SnapshotReason.maxBytesExceeded(bytes, maxBytes),
-            SnapshotReason.maxIntervalExceeded(interval, maxInterval),
-            SnapshotReason.metadataVersionChanged(metadataVersion)
-        );
-
-        String joinedReasons = SnapshotReason.stringFromReasons(reasons);
-
-        assertTrue(joinedReasons.contains("unknown reason"), joinedReasons);
-        assertTrue(joinedReasons.contains("1000 bytes exceeded the maximum bytes of 900"), joinedReasons);
-        assertTrue(joinedReasons.contains("1000 ms exceeded the maximum snapshot interval of 900 ms"), joinedReasons);
-        assertTrue(joinedReasons.contains("metadata version was changed to 3.3-IV3"), joinedReasons);
-    }
-}
diff --git a/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java
index b66d0a3db8e..32e183ce9b0 100644
--- a/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.deferred;
 
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -29,12 +32,18 @@ import java.util.TreeMap;
  * We wait for the high watermark of the log to advance before completing them.
  */
 public class DeferredEventQueue {
+    private final Logger log;
+
     /**
      * A map from log offsets to events.  Each event will be completed once the log
      * advances past its offset.
      */
     private final TreeMap<Long, List<DeferredEvent>> pending = new TreeMap<>();
 
+    public DeferredEventQueue(LogContext logContext) {
+        this.log = logContext.logger(DeferredEventQueue.class);
+    }
+
     /**
      * Complete some purgatory entries.
      *
@@ -42,16 +51,23 @@ public class DeferredEventQueue {
      */
     public void completeUpTo(long offset) {
         Iterator<Entry<Long, List<DeferredEvent>>> iter = pending.entrySet().iterator();
+        int numCompleted = 0;
         while (iter.hasNext()) {
             Entry<Long, List<DeferredEvent>> entry = iter.next();
             if (entry.getKey() > offset) {
                 break;
             }
             for (DeferredEvent event : entry.getValue()) {
+                log.debug("completeUpTo({}): successfully completing {}", offset, event);
                 event.complete(null);
+                numCompleted++;
             }
             iter.remove();
         }
+        if (log.isTraceEnabled()) {
+            log.trace("completeUpTo({}): successfully completed {} deferred entries",
+                    offset, numCompleted);
+        }
     }
 
     /**
@@ -64,6 +80,7 @@ public class DeferredEventQueue {
         while (iter.hasNext()) {
             Entry<Long, List<DeferredEvent>> entry = iter.next();
             for (DeferredEvent event : entry.getValue()) {
+                log.info("failAll({}): failing {}.", exception.getClass().getSimpleName(), event);
                 event.complete(exception);
             }
             iter.remove();
@@ -91,6 +108,9 @@ public class DeferredEventQueue {
             pending.put(offset, events);
         }
         events.add(event);
+        if (log.isTraceEnabled()) {
+            log.trace("Adding deferred event {} at offset {}", event, offset);
+        }
     }
 
     /**
diff --git a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
index 09ec0994694..7c4f0e62a95 100644
--- a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
+++ b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
@@ -21,6 +21,7 @@ import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -51,7 +52,7 @@ public class DeferredEventQueueTest {
 
     @Test
     public void testCompleteEvents() {
-        DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
+        DeferredEventQueue deferredEventQueue = new DeferredEventQueue(new LogContext());
         SampleDeferredEvent event1 = new SampleDeferredEvent();
         SampleDeferredEvent event2 = new SampleDeferredEvent();
         SampleDeferredEvent event3 = new SampleDeferredEvent();
@@ -72,7 +73,7 @@ public class DeferredEventQueueTest {
 
     @Test
     public void testFailOnIncorrectOrdering() {
-        DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
+        DeferredEventQueue deferredEventQueue = new DeferredEventQueue(new LogContext());
         SampleDeferredEvent event1 = new SampleDeferredEvent();
         SampleDeferredEvent event2 = new SampleDeferredEvent();
         deferredEventQueue.add(2, event1);
@@ -81,7 +82,7 @@ public class DeferredEventQueueTest {
 
     @Test
     public void testFailEvents() {
-        DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
+        DeferredEventQueue deferredEventQueue = new DeferredEventQueue(new LogContext());
         SampleDeferredEvent event1 = new SampleDeferredEvent();
         SampleDeferredEvent event2 = new SampleDeferredEvent();
         SampleDeferredEvent event3 = new SampleDeferredEvent();