You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/04/12 22:44:18 UTC

[kafka] branch 3.4 updated: KAFKA-14857: Fix some MetadataLoader bugs (#13462)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 8f94b627aea KAFKA-14857: Fix some MetadataLoader bugs (#13462)
8f94b627aea is described below

commit 8f94b627aea3f6415165b100f5070ba9f2f2eb66
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Wed Mar 29 12:30:12 2023 -0700

    KAFKA-14857: Fix some MetadataLoader bugs (#13462)
    
    The MetadataLoader is not supposed to publish metadata updates until we have loaded up to the high
    water mark. Previously, this logic was broken, and we published updates immediately. This PR fixes
    that and adds a junit test.
    
    Another issue is that the MetadataLoader previously assumed that we would periodically get
    callbacks from the Raft layer even if nothing had happened. We relied on this to install new
    publishers in a timely fashion, for example. However, in older MetadataVersions that don't include
    NoOpRecord, this is not a safe assumption.
    
    Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, fix the log prefix for
    BrokerLifecycleManager, and remove metadata publishers on brokerserver shutdown (like we do for
    controllers).
    
    Reviewers: David Arthur <mu...@gmail.com>, dengziming <de...@gmail.com>
    
    Conflicts: This patch was cut down to make it easier to cherry-pick to this older branch.
    Specifically, I removed the BrokerLifecycleManager.scala logging change and the BrokerServer
    installPublishers and removeAndClosePublisher changes.
---
 .../apache/kafka/image/loader/MetadataLoader.java  | 160 +++++++++++++--------
 .../kafka/image/loader/MetadataLoaderTest.java     |  44 +++---
 .../image/publisher/SnapshotGeneratorTest.java     |   7 +-
 .../org/apache/kafka/queue/KafkaEventQueue.java    |   4 +
 4 files changed, 134 insertions(+), 81 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 906b0f32e32..4b820fbcdfb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -25,6 +25,7 @@ import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.image.writer.ImageReWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.BatchReader;
@@ -41,6 +42,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -138,6 +140,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
         }
     }
 
+    private static final String INITIALIZE_NEW_PUBLISHERS = "InitializeNewPublishers";
+
     /**
      * The log4j logger for this loader.
      */
@@ -164,7 +168,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
     private final Supplier<OptionalLong> highWaterMarkAccessor;
 
     /**
-     * Publishers which haven't been initialized yet.
+     * Publishers which haven't received any metadata yet.
      */
     private final LinkedHashMap<String, MetadataPublisher> uninitializedPublishers;
 
@@ -174,9 +178,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
     private final LinkedHashMap<String, MetadataPublisher> publishers;
 
     /**
-     * True if we have caught up with the initial high water mark.
+     * True if we have not caught up with the initial high water mark.
+     * We do not send out any metadata updates until this is true.
      */
-    private boolean catchingUp = false;
+    private boolean catchingUp = true;
 
     /**
      * The current leader and epoch.
@@ -209,36 +214,65 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
         this.uninitializedPublishers = new LinkedHashMap<>();
         this.publishers = new LinkedHashMap<>();
         this.image = MetadataImage.EMPTY;
-        this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix);
+        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
     }
 
-    private boolean stillNeedToCatchUp(long offset) {
+    private boolean stillNeedToCatchUp(String where, long offset) {
         if (!catchingUp) {
-            log.trace("We are not in the initial catching up state.");
+            log.trace("{}: we are not in the initial catching up state.", where);
             return false;
         }
         OptionalLong highWaterMark = highWaterMarkAccessor.get();
         if (!highWaterMark.isPresent()) {
-            log.info("The loader is still catching up because we still don't know the high " +
-                    "water mark yet.");
+            log.info("{}: the loader is still catching up because we still don't know the high " +
+                    "water mark yet.", where);
             return true;
         }
-        if (highWaterMark.getAsLong() > offset) {
-            log.info("The loader is still catching up because we have loaded up to offset " +
-                    offset + ", but the high water mark is " + highWaterMark.getAsLong());
+        if (highWaterMark.getAsLong() - 1 > offset) {
+            log.info("{}: The loader is still catching up because we have loaded up to offset " +
+                    offset + ", but the high water mark is {}", where, highWaterMark.getAsLong());
             return true;
         }
-        log.info("The loader finished catch up to the current high water mark of " +
-                highWaterMark.getAsLong());
-        catchingUp = true;
+        log.info("{}: The loader finished catching up to the current high water mark of {}",
+                where, highWaterMark.getAsLong());
+        catchingUp = false;
         return false;
     }
 
-    private void maybeInitializeNewPublishers() {
+    /**
+     * Schedule an event to initialize the new publishers that are present in the system.
+     *
+     * @param delayNs   The minimum time in nanoseconds we should wait. If there is already an
+     *                  initialization event scheduled, we will either move its deadline forward
+     *                  in time or leave it unchanged.
+     */
+    void scheduleInitializeNewPublishers(long delayNs) {
+        eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS,
+            new EventQueue.EarliestDeadlineFunction(eventQueue.time().nanoseconds() + delayNs),
+            () -> {
+                try {
+                    initializeNewPublishers();
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Unhandled error initializing new publishers", e);
+                }
+            });
+    }
+
+    void initializeNewPublishers() {
         if (uninitializedPublishers.isEmpty()) {
-            log.trace("There are no uninitialized publishers to initialize.");
+            log.debug("InitializeNewPublishers: nothing to do.");
+            return;
+        }
+        if (stillNeedToCatchUp("initializeNewPublishers", image.highestOffsetAndEpoch().offset())) {
+        // Reschedule the initialization for later.
+            log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} " +
+                            "because we are still catching up with quorum metadata. Rescheduling.",
+                    uninitializedPublisherNames());
+            scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100));
             return;
         }
+        log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}",
+                uninitializedPublisherNames());
         long startNs = time.nanoseconds();
         MetadataDelta delta = new MetadataDelta.Builder().
                 setImage(image).
@@ -255,18 +289,21 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
             MetadataPublisher publisher = iter.next();
             iter.remove();
             try {
-                log.info("Publishing initial snapshot at offset {} to {}",
-                        image.highestOffsetAndEpoch().offset(), publisher.name());
+                log.info("InitializeNewPublishers: initializing {} with a snapshot at offset {}",
+                        publisher.name(), image.highestOffsetAndEpoch().offset());
                 publisher.publishSnapshot(delta, image, manifest);
                 publishers.put(publisher.name(), publisher);
             } catch (Throwable e) {
-                faultHandler.handleFault("Unhandled error publishing the initial metadata " +
-                        "image from snapshot at offset " + image.highestOffsetAndEpoch().offset() +
-                        " with publisher " + publisher.name(), e);
+                faultHandler.handleFault("Unhandled error initializing " + publisher.name() +
+                        " with a snapshot at offset " + image.highestOffsetAndEpoch().offset(), e);
             }
         }
     }
 
+    private String uninitializedPublisherNames() {
+        return String.join(", ", uninitializedPublishers.keySet());
+    }
+
     @Override
     public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
         eventQueue.append(() -> {
@@ -275,11 +312,9 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                         setImage(image).
                         build();
                 LogDeltaManifest manifest = loadLogDelta(delta, reader);
-                if (log.isDebugEnabled()) {
-                    log.debug("Generated a metadata delta between {} and {} from {} batch(es) " +
-                            "in {} us.", image.offset(), manifest.provenance().lastContainedOffset(),
-                            manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
-                }
+                log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} " +
+                        "in {} us.", manifest.provenance().lastContainedOffset(),
+                        NANOSECONDS.toMicros(manifest.elapsedNs()));
                 try {
                     image = delta.apply(manifest.provenance());
                 } catch (Throwable e) {
@@ -288,10 +323,12 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                             " and " + manifest.provenance().lastContainedOffset(), e);
                     return;
                 }
-                if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
+                if (stillNeedToCatchUp("handleCommit", manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.debug("Publishing new image with provenance {}.", image.provenance());
+                if (log.isDebugEnabled()) {
+                    log.debug("handleCommit: publishing new image with provenance {}.", image.provenance());
+                }
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.publishLogDelta(delta, image, manifest);
@@ -301,8 +338,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                                 " with publisher " + publisher.name(), e);
                     }
                 }
-                maybeInitializeNewPublishers();
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                if (uninitializedPublishers.isEmpty()) {
+                    scheduleInitializeNewPublishers(0);
+                }
             } catch (Throwable e) {
                 // This is a general catch-all block where we don't expect to end up;
                 // failure-prone operations should have individual try/catch blocks around them.
@@ -373,11 +412,9 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                         setImage(image).
                         build();
                 SnapshotManifest manifest = loadSnapshot(delta, reader);
-                if (log.isDebugEnabled()) {
-                    log.debug("Generated a metadata delta from a snapshot at offset {} " +
-                            "in {} us.", manifest.provenance().lastContainedOffset(),
-                            NANOSECONDS.toMicros(manifest.elapsedNs()));
-                }
+                log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} " +
+                        "in {} us.", manifest.provenance().lastContainedOffset(),
+                        NANOSECONDS.toMicros(manifest.elapsedNs()));
                 try {
                     image = delta.apply(manifest.provenance());
                 } catch (Throwable e) {
@@ -385,10 +422,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                             "snapshot at offset " + reader.lastContainedLogOffset(), e);
                     return;
                 }
-                if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
+                if (stillNeedToCatchUp("handleSnapshot", manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.debug("Publishing new snapshot image with provenance {}.", image.provenance());
+                log.info("handleSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.publishSnapshot(delta, image, manifest);
@@ -398,8 +435,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                                     " with publisher " + publisher.name(), e);
                     }
                 }
-                maybeInitializeNewPublishers();
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                if (uninitializedPublishers.isEmpty()) {
+                    scheduleInitializeNewPublishers(0);
+                }
             } catch (Throwable e) {
                 // This is a general catch-all block where we don't expect to end up;
                 // failure-prone operations should have individual try/catch blocks around them.
@@ -465,7 +504,29 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
         CompletableFuture<Void> future = new CompletableFuture<>();
         eventQueue.append(() -> {
             try {
-                installNewPublishers(newPublishers);
+                // Check that none of the publishers we are trying to install are already present.
+                for (MetadataPublisher newPublisher : newPublishers) {
+                    MetadataPublisher prev = publishers.get(newPublisher.name());
+                    if (prev == null) {
+                        prev = uninitializedPublishers.get(newPublisher.name());
+                    }
+                    if (prev != null) {
+                        if (prev == newPublisher) {
+                            throw faultHandler.handleFault("Attempted to install publisher " +
+                                    newPublisher.name() + ", which is already installed.");
+                        } else {
+                            throw faultHandler.handleFault("Attempted to install a new publisher " +
+                                    "named " + newPublisher.name() + ", but there is already a publisher " +
+                                    "with that name.");
+                        }
+                    }
+                }
+                // After installation, new publishers must be initialized by sending them a full
+                // snapshot of the current state. However, we cannot necessarily do that immediately,
+                // because the loader itself might not be ready. Therefore, we schedule a background
+                // task.
+                newPublishers.forEach(p -> uninitializedPublishers.put(p.name(), p));
+                scheduleInitializeNewPublishers(0);
                 future.complete(null);
             } catch (Throwable e) {
                 future.completeExceptionally(faultHandler.handleFault("Unhandled fault in " +
@@ -475,29 +536,6 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
         return future;
     }
 
-    void installNewPublishers(
-        List<? extends MetadataPublisher> newPublishers
-    ) {
-        // Publishers can't be re-installed if they're already present.
-        for (MetadataPublisher newPublisher : newPublishers) {
-            MetadataPublisher prev = publishers.get(newPublisher.name());
-            if (prev == null) {
-                prev = uninitializedPublishers.get(newPublisher.name());
-            }
-            if (prev != null) {
-                if (prev == newPublisher) {
-                    throw faultHandler.handleFault("Attempted to install publisher " +
-                            newPublisher.name() + ", which is already installed.");
-                } else {
-                    throw faultHandler.handleFault("Attempted to install a new publisher " +
-                            "named " + newPublisher.name() + ", but there is already a publisher " +
-                            "with that name.");
-                }
-            }
-            uninitializedPublishers.put(newPublisher.name(), newPublisher);
-        }
-    }
-
     // VisibleForTesting
     void waitForAllEventsToBeHandled() throws Exception {
         CompletableFuture<Void> future = new CompletableFuture<>();
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index b7e49243c43..489a9bca80e 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -42,7 +42,10 @@ import org.junit.jupiter.params.provider.CsvSource;
 import java.util.Iterator;
 import java.util.List;
 import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -71,12 +74,13 @@ public class MetadataLoaderTest {
     }
 
     static class MockPublisher implements MetadataPublisher {
+        final CompletableFuture<Void> firstPublish = new CompletableFuture<>();
         private final String name;
-        MetadataDelta latestDelta = null;
-        MetadataImage latestImage = null;
-        LogDeltaManifest latestLogDeltaManifest = null;
-        SnapshotManifest latestSnapshotManifest = null;
-        boolean closed = false;
+        volatile MetadataDelta latestDelta = null;
+        volatile MetadataImage latestImage = null;
+        volatile LogDeltaManifest latestLogDeltaManifest = null;
+        volatile SnapshotManifest latestSnapshotManifest = null;
+        volatile boolean closed = false;
 
         MockPublisher() {
             this("MockPublisher");
@@ -100,6 +104,7 @@ public class MetadataLoaderTest {
             latestDelta = delta;
             latestImage = newImage;
             latestSnapshotManifest = manifest;
+            firstPublish.complete(null);
         }
 
         @Override
@@ -111,10 +116,12 @@ public class MetadataLoaderTest {
             latestDelta = delta;
             latestImage = newImage;
             latestLogDeltaManifest = manifest;
+            firstPublish.complete(null);
         }
 
         @Override
         public void close() throws Exception {
+            firstPublish.completeExceptionally(new RejectedExecutionException());
             closed = true;
         }
     }
@@ -264,7 +271,7 @@ public class MetadataLoaderTest {
                 new MockPublisher("c"));
         try (MetadataLoader loader = new MetadataLoader.Builder().
                 setFaultHandler(faultHandler).
-                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
                 build()) {
             loader.installPublishers(publishers.subList(0, 2)).get();
             loader.removeAndClosePublisher(publishers.get(1)).get();
@@ -278,6 +285,7 @@ public class MetadataLoaderTest {
             loader.handleSnapshot(snapshotReader);
             loader.waitForAllEventsToBeHandled();
             assertTrue(snapshotReader.closed);
+            publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
             loader.removeAndClosePublisher(publishers.get(0)).get();
         }
         assertTrue(publishers.get(0).closed);
@@ -304,6 +312,7 @@ public class MetadataLoaderTest {
                 setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
                 build()) {
             loader.installPublishers(publishers).get();
+            publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
             loadEmptySnapshot(loader, 200);
             assertEquals(200L, loader.lastAppliedOffset());
             loadEmptySnapshot(loader, 300);
@@ -398,12 +407,13 @@ public class MetadataLoaderTest {
         try (MetadataLoader loader = new MetadataLoader.Builder().
                 setFaultHandler(faultHandler).
                 setTime(time).
-                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
                 build()) {
             loader.installPublishers(publishers).get();
             loadTestSnapshot(loader, 200);
+            publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
             MockBatchReader batchReader = new MockBatchReader(300, asList(
-                Batch.control(300, 100, 4000, 10, 400))).
+                    Batch.control(300, 100, 4000, 10, 400))).
                     setTime(time);
             loader.handleCommit(batchReader);
             loader.waitForAllEventsToBeHandled();
@@ -429,7 +439,7 @@ public class MetadataLoaderTest {
                 new MockPublisher("b"));
         try (MetadataLoader loader = new MetadataLoader.Builder().
                 setFaultHandler(faultHandler).
-                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
                 build()) {
             loader.installPublishers(publishers).get();
             loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
@@ -441,6 +451,9 @@ public class MetadataLoaderTest {
                         setName("foo").
                         setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
                 )));
+            for (MockPublisher publisher : publishers) {
+                publisher.firstPublish.get(1, TimeUnit.MINUTES);
+            }
             loader.waitForAllEventsToBeHandled();
             assertEquals(200L, loader.lastAppliedOffset());
             loader.handleCommit(new MockBatchReader(201, asList(
@@ -463,6 +476,7 @@ public class MetadataLoaderTest {
      * Test that we do not leave the catchingUp state state until we have loaded up to the high
      * water mark.
      */
+    @Test
     public void testCatchingUpState() throws Exception {
         MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
         List<MockPublisher> publishers = asList(new MockPublisher("a"),
@@ -478,22 +492,18 @@ public class MetadataLoaderTest {
             // We don't update lastAppliedOffset because we're still in catchingUp state due to
             // highWaterMark being OptionalLong.empty (aka unknown).
             assertEquals(-1L, loader.lastAppliedOffset());
+            assertFalse(publishers.get(0).firstPublish.isDone());
 
-            // Setting the high water mark here doesn't do anything because we only check it when
-            // we're publishing an update. This is OK because we know that we'll get updates
-            // frequently. If there is no other activity, there will at least be NoOpRecords.
-            highWaterMark.set(OptionalLong.of(0));
-            assertEquals(-1L, loader.lastAppliedOffset());
-
-            // This still doesn't advance lastAppliedOffset since the high water mark at 220
+            // This still doesn't advance lastAppliedOffset since the high water mark at 221
             // is greater than our snapshot at 210.
-            highWaterMark.set(OptionalLong.of(220));
+            highWaterMark.set(OptionalLong.of(221));
             loadTestSnapshot(loader, 210);
             assertEquals(-1L, loader.lastAppliedOffset());
 
             // Loading a test snapshot at 220 allows us to leave catchUp state.
             loadTestSnapshot(loader, 220);
             assertEquals(220L, loader.lastAppliedOffset());
+            publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
         }
         faultHandler.maybeRethrowFirstException();
     }
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
index 47befabcaba..e03c4eb0bff 100644
--- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 public class SnapshotGeneratorTest {
     static class MockEmitter implements SnapshotGenerator.Emitter {
         private final CountDownLatch latch = new CountDownLatch(1);
-        private final List<MetadataImage> images = new ArrayList<>();
+        private final List<MetadataImage> images = new CopyOnWriteArrayList<>();
         private RuntimeException problem = null;
 
         MockEmitter setReady() {
@@ -66,14 +67,14 @@ public class SnapshotGeneratorTest {
                 throw currentProblem;
             }
             try {
-                latch.await();
+                latch.await(30, TimeUnit.SECONDS);
             } catch (Throwable e) {
                 throw new RuntimeException(e);
             }
             images.add(image);
         }
 
-        synchronized List<MetadataImage> images() {
+        List<MetadataImage> images() {
             return new ArrayList<>(images);
         }
     }
diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index b53339edfe3..269eacfc2c2 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -390,6 +390,10 @@ public final class KafkaEventQueue implements EventQueue {
         this.eventHandlerThread.start();
     }
 
+    public Time time() {
+        return time;
+    }
+
     @Override
     public void enqueue(EventInsertionType insertionType,
                         String tag,