You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/13 18:24:03 UTC

[kafka] branch 3.3 updated (e2136746159 -> 7617f1a0c94)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from e2136746159 KAFKA-14225; Fix deadlock caused by lazy val exemptSensor (#12634)
     new 850612618fd MINOR: fix indentation and add builders in some KRaft tests (#12720)
     new 7617f1a0c94 KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/scala/kafka/server/ControllerServer.scala |   1 -
 .../kafka/server/QuorumTestHarness.scala           |   2 +-
 .../apache/kafka/controller/QuorumController.java  |  20 +-
 .../kafka/controller/QuorumControllerTest.java     | 970 ++++++++++++---------
 .../kafka/controller/QuorumControllerTestEnv.java  |  93 +-
 .../apache/kafka/metalog/LocalLogManagerTest.java  |  25 +-
 .../kafka/metalog/LocalLogManagerTestEnv.java      |  71 +-
 7 files changed, 664 insertions(+), 518 deletions(-)


[kafka] 02/02: KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7617f1a0c949352512640e947c9b59cf34bdfceb
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Tue Oct 11 09:46:42 2022 -0700

    KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)
    
    Make all faults in metadata processing on standby controllers be fatal. This is the same behavior-wise as the active controller. This prevents a standby controller from eventually becoming active with incomplete state.
    
    Reviewers: Colin Patrick McCabe <cm...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/ControllerServer.scala |  1 -
 .../kafka/server/QuorumTestHarness.scala           |  2 +-
 .../apache/kafka/controller/QuorumController.java  | 20 +--------
 .../kafka/controller/QuorumControllerTest.java     | 49 ++++++++++++++++++++--
 .../kafka/controller/QuorumControllerTestEnv.java  | 31 ++++++++------
 5 files changed, 66 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 441171fe1d9..8e52f30d2ea 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -209,7 +209,6 @@ class ControllerServer(
           setConfigurationValidator(new ControllerConfigurationValidator()).
           setStaticConfig(config.originals).
           setBootstrapMetadata(bootstrapMetadata).
-          setMetadataFaultHandler(metadataFaultHandler).
           setFatalFaultHandler(fatalFaultHandler)
       }
       authorizer match {
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 6bb2e769cda..f456770f0f7 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -323,7 +323,7 @@ abstract class QuorumTestHarness extends Logging {
         raftApiVersions = raftManager.apiVersions,
         bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "test harness"),
         metadataFaultHandler = faultHandler,
-        fatalFaultHandler = faultHandler,
+        fatalFaultHandler = faultHandler
       )
       controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
         if (e != null) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7ad601b0079..034ef1e216c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -157,7 +157,6 @@ public final class QuorumController implements Controller {
         private final int nodeId;
         private final String clusterId;
         private FaultHandler fatalFaultHandler = null;
-        private FaultHandler metadataFaultHandler = null;
         private Time time = Time.SYSTEM;
         private String threadNamePrefix = null;
         private LogContext logContext = null;
@@ -190,11 +189,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) {
-            this.metadataFaultHandler = metadataFaultHandler;
-            return this;
-        }
-
         public int nodeId() {
             return nodeId;
         }
@@ -314,8 +308,6 @@ public final class QuorumController implements Controller {
                 throw new IllegalStateException("You must specify the quorum features");
             } else if (fatalFaultHandler == null) {
                 throw new IllegalStateException("You must specify a fatal fault handler.");
-            } else if (metadataFaultHandler == null) {
-                throw new IllegalStateException("You must specify a metadata fault handler.");
             }
 
             if (threadNamePrefix == null) {
@@ -334,7 +326,6 @@ public final class QuorumController implements Controller {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
                 return new QuorumController(
                     fatalFaultHandler,
-                    metadataFaultHandler,
                     logContext,
                     nodeId,
                     clusterId,
@@ -983,7 +974,7 @@ public final class QuorumController implements Controller {
                                             "controller, which was %d of %d record(s) in the batch with baseOffset %d.",
                                             message.message().getClass().getSimpleName(), i, messages.size(),
                                             batch.baseOffset());
-                                    throw metadataFaultHandler.handleFault(failureMessage, e);
+                                    throw fatalFaultHandler.handleFault(failureMessage, e);
                                 }
                                 i++;
                             }
@@ -1039,7 +1030,7 @@ public final class QuorumController implements Controller {
                                         "%d record(s) in the batch with baseOffset %d.",
                                         message.message().getClass().getSimpleName(), reader.snapshotId(),
                                         i, messages.size(), batch.baseOffset());
-                                throw metadataFaultHandler.handleFault(failureMessage, e);
+                                throw fatalFaultHandler.handleFault(failureMessage, e);
                             }
                             i++;
                         }
@@ -1489,11 +1480,6 @@ public final class QuorumController implements Controller {
      */
     private final FaultHandler fatalFaultHandler;
 
-    /**
-     * Handles faults in metadata handling that are normally not fatal.
-     */
-    private final FaultHandler metadataFaultHandler;
-
     /**
      * The slf4j log context, used to create new loggers.
      */
@@ -1702,7 +1688,6 @@ public final class QuorumController implements Controller {
 
     private QuorumController(
         FaultHandler fatalFaultHandler,
-        FaultHandler metadataFaultHandler,
         LogContext logContext,
         int nodeId,
         String clusterId,
@@ -1728,7 +1713,6 @@ public final class QuorumController implements Controller {
         int maxRecordsPerBatch
     ) {
         this.fatalFaultHandler = fatalFaultHandler;
-        this.metadataFaultHandler = metadataFaultHandler;
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index abd214f7e5d..a5e7258a724 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1315,10 +1315,51 @@ public class QuorumControllerTest {
                                         setValue(null), (short) 0)), null);
                     });
             assertThrows(ExecutionException.class, () -> future.get());
-            assertEquals(NullPointerException.class,
-                    controlEnv.fatalFaultHandler().firstException().getCause().getClass());
-            controlEnv.fatalFaultHandler().setIgnore(true);
-            controlEnv.metadataFaultHandler().setIgnore(true);
+            assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId())
+                .firstException().getCause().getClass());
+            controlEnv.ignoreFatalFaults();
+        }
+    }
+
+    @Test
+    public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
+        InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(Arrays.asList(
+            new ApiMessageAndVersion(new PartitionRecord(), (short) 0)))
+        );
+
+        LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3)
+            .setSnapshotReader(FileRawSnapshotReader.open(
+                invalidSnapshot.tempDir.toPath(),
+                new OffsetAndEpoch(0, 0)
+            ));
+
+        try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) {
+            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
+                TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
+                    return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
+                }),
+                    "At least one controller failed to detect the fatal fault"
+                );
+                controlEnv.ignoreFatalFaults();
+            }
+        }
+    }
+
+    @Test
+    public void testFatalMetadataErrorDuringLogLoading() throws Exception {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()) {
+            logEnv.appendInitialRecords(Collections.unmodifiableList(Arrays.asList(
+                new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
+            ));
+
+            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
+                TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
+                    return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
+                }),
+                    "At least one controller failed to detect the fatal fault"
+                );
+                controlEnv.ignoreFatalFaults();
+            }
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 993ba848aad..b2fe8785661 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -26,7 +26,9 @@ import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
@@ -40,8 +42,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 public class QuorumControllerTestEnv implements AutoCloseable {
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
-    private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
-    private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
+    private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new HashMap<>();
 
     public static class Builder {
         private final LocalLogManagerTestEnv logEnv;
@@ -98,17 +99,18 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         try {
             ApiVersions apiVersions = new ApiVersions();
             List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().collect(Collectors.toList());
-            for (int i = 0; i < numControllers; i++) {
-                QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
-                builder.setRaftClient(logEnv.logManagers().get(i));
+            for (int nodeId = 0; nodeId < numControllers; nodeId++) {
+                QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId());
+                builder.setRaftClient(logEnv.logManagers().get(nodeId));
                 builder.setBootstrapMetadata(bootstrapMetadata);
                 builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
-                builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
+                builder.setQuorumFeatures(new QuorumFeatures(nodeId, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                 });
+                MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
                 builder.setFatalFaultHandler(fatalFaultHandler);
-                builder.setMetadataFaultHandler(metadataFaultHandler);
+                fatalFaultHandlers.put(nodeId, fatalFaultHandler);
                 controllerBuilderInitializer.accept(builder);
                 this.controllers.add(builder.build());
             }
@@ -142,12 +144,14 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         return controllers;
     }
 
-    public MockFaultHandler fatalFaultHandler() {
-        return fatalFaultHandler;
+    public MockFaultHandler fatalFaultHandler(Integer nodeId) {
+        return fatalFaultHandlers.get(nodeId);
     }
 
-    public MockFaultHandler metadataFaultHandler() {
-        return metadataFaultHandler;
+    public void ignoreFatalFaults() {
+        for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
+            faultHandler.setIgnore(true);
+        }
     }
 
     @Override
@@ -158,7 +162,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         for (QuorumController controller : controllers) {
             controller.close();
         }
-        fatalFaultHandler.maybeRethrowFirstException();
-        metadataFaultHandler.maybeRethrowFirstException();
+        for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
+            faultHandler.maybeRethrowFirstException();
+        }
     }
 }


[kafka] 01/02: MINOR: fix indentation and add builders in some KRaft tests (#12720)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 850612618fd3c81abd22c69786337e8ccdc4b24f
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Oct 7 13:53:41 2022 -0700

    MINOR: fix indentation and add builders in some KRaft tests (#12720)
    
    Add builders for LocalLogManagerTestEnv and QuorumControllerTestEnv, since the constructor
    overloads were starting to get unwieldy.
    
    Make indentation more consistent in QuorumControllerTest. Take advantage of the fact that you can
    initialize multiple resources in a Java try-with-resources block to avoid excessive indentation in a few
    cases.
    
    Reviewers: José Armando García Sancio <js...@apache.org>
---
 .../kafka/controller/QuorumControllerTest.java     | 931 +++++++++++----------
 .../kafka/controller/QuorumControllerTestEnv.java  |  62 +-
 .../apache/kafka/metalog/LocalLogManagerTest.java  |  25 +-
 .../kafka/metalog/LocalLogManagerTestEnv.java      |  71 +-
 4 files changed, 603 insertions(+), 486 deletions(-)

diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a1a3741b6a3..abd214f7e5d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -145,9 +145,13 @@ public class QuorumControllerTest {
     public void testCreateAndClose() throws Throwable {
         MockControllerMetrics metrics = new MockControllerMetrics();
         try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv =
-                new QuorumControllerTestEnv(logEnv, builder -> builder.setMetrics(metrics))
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setMetrics(metrics);
+                }).
+                build()
         ) {
         }
         assertTrue(metrics.isClosed(), "metrics were not closed");
@@ -159,10 +163,13 @@ public class QuorumControllerTest {
     @Test
     public void testConfigurationOperations() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
@@ -197,10 +204,13 @@ public class QuorumControllerTest {
     @Test
     public void testDelayedConfigurationOperations() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
@@ -211,9 +221,10 @@ public class QuorumControllerTest {
         }
     }
 
-    private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
-                                                    QuorumController controller)
-                                                    throws Throwable {
+    private void testDelayedConfigurationOperations(
+        LocalLogManagerTestEnv logEnv,
+        QuorumController controller
+    ) throws Throwable {
         logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
         CompletableFuture<Map<ConfigResource, ApiError>> future1 =
             controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
@@ -237,14 +248,15 @@ public class QuorumControllerTest {
         long sessionTimeoutMillis = 1000;
 
         try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv,
-                b -> {
-                    b.setConfigSchema(SCHEMA);
-                },
-                OptionalLong.of(sessionTimeoutMillis),
-                OptionalLong.empty(),
-                SIMPLE_BOOTSTRAP);
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
+                setBootstrapMetadata(SIMPLE_BOOTSTRAP).
+                build();
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -334,14 +346,16 @@ public class QuorumControllerTest {
         long leaderImbalanceCheckIntervalNs = 1_000_000_000;
 
         try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv,
-                b -> {
-                    b.setConfigSchema(SCHEMA);
-                },
-                OptionalLong.of(sessionTimeoutMillis),
-                OptionalLong.of(leaderImbalanceCheckIntervalNs),
-                SIMPLE_BOOTSTRAP);
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
+                setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)).
+                setBootstrapMetadata(SIMPLE_BOOTSTRAP).
+                build();
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -467,11 +481,14 @@ public class QuorumControllerTest {
         long maxIdleIntervalNs = 1_000;
         long maxReplicationDelayMs = 60_000;
         try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-                b.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
-            });
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                    controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
+                }).
+                build();
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -509,61 +526,65 @@ public class QuorumControllerTest {
 
     @Test
     public void testUnregisterBroker() throws Throwable {
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })) {
-                ListenerCollection listeners = new ListenerCollection();
-                listeners.add(new Listener().setName("PLAINTEXT").
-                    setHost("localhost").setPort(9092));
-                QuorumController active = controlEnv.activeController();
-                CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
-                    ANONYMOUS_CONTEXT,
-                    new BrokerRegistrationRequestData().
-                        setBrokerId(0).
-                        setClusterId(active.clusterId()).
-                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
-                        setListeners(listeners));
-                assertEquals(2L, reply.get().epoch());
-                CreateTopicsRequestData createTopicsRequestData =
-                    new CreateTopicsRequestData().setTopics(
-                        new CreatableTopicCollection(Collections.singleton(
-                            new CreatableTopic().setName("foo").setNumPartitions(1).
-                                setReplicationFactor((short) 1)).iterator()));
-                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics(
-                    ANONYMOUS_CONTEXT,
-                    createTopicsRequestData, Collections.singleton("foo")).get().
-                        topics().find("foo").errorCode());
-                assertEquals("Unable to replicate the partition 1 time(s): All brokers " +
-                    "are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT,
-                        createTopicsRequestData, Collections.singleton("foo")).
-                            get().topics().find("foo").errorMessage());
-                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                            setWantFence(false).setBrokerEpoch(2L).setBrokerId(0).
-                            setCurrentMetadataOffset(100000L)).get());
-                assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
+        ) {
+            ListenerCollection listeners = new ListenerCollection();
+            listeners.add(new Listener().setName("PLAINTEXT").
+                setHost("localhost").setPort(9092));
+            QuorumController active = controlEnv.activeController();
+            CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
+                ANONYMOUS_CONTEXT,
+                new BrokerRegistrationRequestData().
+                    setBrokerId(0).
+                    setClusterId(active.clusterId()).
+                    setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
+                    setListeners(listeners));
+            assertEquals(2L, reply.get().epoch());
+            CreateTopicsRequestData createTopicsRequestData =
+                new CreateTopicsRequestData().setTopics(
+                    new CreatableTopicCollection(Collections.singleton(
+                        new CreatableTopic().setName("foo").setNumPartitions(1).
+                            setReplicationFactor((short) 1)).iterator()));
+            assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics(
+                ANONYMOUS_CONTEXT,
+                createTopicsRequestData, Collections.singleton("foo")).get().
+                    topics().find("foo").errorCode());
+            assertEquals("Unable to replicate the partition 1 time(s): All brokers " +
+                "are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT,
                     createTopicsRequestData, Collections.singleton("foo")).
-                        get().topics().find("foo").errorCode());
-                CompletableFuture<TopicIdPartition> topicPartitionFuture = active.appendReadEvent(
-                    "debugGetPartition", OptionalLong.empty(), () -> {
-                        Iterator<TopicIdPartition> iterator = active.
-                            replicationControl().brokersToIsrs().iterator(0, true);
-                        assertTrue(iterator.hasNext());
-                        return iterator.next();
-                    });
-                assertEquals(0, topicPartitionFuture.get().partitionId());
-                active.unregisterBroker(ANONYMOUS_CONTEXT, 0).get();
-                topicPartitionFuture = active.appendReadEvent(
-                    "debugGetPartition", OptionalLong.empty(), () -> {
-                        Iterator<TopicIdPartition> iterator = active.
-                            replicationControl().brokersToIsrs().partitionsWithNoLeader();
-                        assertTrue(iterator.hasNext());
-                        return iterator.next();
-                    });
-                assertEquals(0, topicPartitionFuture.get().partitionId());
-            }
+                        get().topics().find("foo").errorMessage());
+            assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+                active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
+                        setWantFence(false).setBrokerEpoch(2L).setBrokerId(0).
+                        setCurrentMetadataOffset(100000L)).get());
+            assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
+                createTopicsRequestData, Collections.singleton("foo")).
+                    get().topics().find("foo").errorCode());
+            CompletableFuture<TopicIdPartition> topicPartitionFuture = active.appendReadEvent(
+                "debugGetPartition", OptionalLong.empty(), () -> {
+                    Iterator<TopicIdPartition> iterator = active.
+                        replicationControl().brokersToIsrs().iterator(0, true);
+                    assertTrue(iterator.hasNext());
+                    return iterator.next();
+                });
+            assertEquals(0, topicPartitionFuture.get().partitionId());
+            active.unregisterBroker(ANONYMOUS_CONTEXT, 0).get();
+            topicPartitionFuture = active.appendReadEvent(
+                "debugGetPartition", OptionalLong.empty(), () -> {
+                    Iterator<TopicIdPartition> iterator = active.
+                        replicationControl().brokersToIsrs().partitionsWithNoLeader();
+                    assertTrue(iterator.hasNext());
+                    return iterator.next();
+                });
+            assertEquals(0, topicPartitionFuture.get().partitionId());
         }
     }
 
@@ -572,7 +593,9 @@ public class QuorumControllerTest {
     }
 
     private BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
-            MetadataVersion minVersion, MetadataVersion maxVersion) {
+        MetadataVersion minVersion,
+        MetadataVersion maxVersion
+    ) {
         BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
         features.add(new BrokerRegistrationRequestData.Feature()
             .setName(MetadataVersion.FEATURE_NAME)
@@ -599,71 +622,77 @@ public class QuorumControllerTest {
         Map<Integer, Long> brokerEpochs = new HashMap<>();
         RawSnapshotReader reader = null;
         Uuid fooId;
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
-                logEnv,
-                b -> b.setConfigSchema(SCHEMA),
-                OptionalLong.empty(),
-                OptionalLong.empty(),
-                SIMPLE_BOOTSTRAP)
-            ) {
-                QuorumController active = controlEnv.activeController();
-                for (int i = 0; i < numBrokers; i++) {
-                    BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
-                        new BrokerRegistrationRequestData().
-                            setBrokerId(i).
-                            setRack(null).
-                            setClusterId(active.clusterId()).
-                            setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
-                            setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
-                            setListeners(new ListenerCollection(Arrays.asList(new Listener().
-                                setName("PLAINTEXT").setHost("localhost").
-                                setPort(9092 + i)).iterator()))).get();
-                    brokerEpochs.put(i, reply.epoch());
-                }
-                for (int i = 0; i < numBrokers - 1; i++) {
-                    assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                        active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                            setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
-                            setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
-                }
-                CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
-                    new CreateTopicsRequestData().setTopics(
-                        new CreatableTopicCollection(Collections.singleton(
-                            new CreatableTopic().setName("foo").setNumPartitions(-1).
-                                setReplicationFactor((short) -1).
-                                setAssignments(new CreatableReplicaAssignmentCollection(
-                                    Arrays.asList(new CreatableReplicaAssignment().
-                                        setPartitionIndex(0).
-                                        setBrokerIds(Arrays.asList(0, 1, 2)),
-                                    new CreatableReplicaAssignment().
-                                        setPartitionIndex(1).
-                                        setBrokerIds(Arrays.asList(1, 2, 0))).
-                                            iterator()))).iterator())),
-                    Collections.singleton("foo")).get();
-                fooId = fooData.topics().find("foo").topicId();
-                active.allocateProducerIds(ANONYMOUS_CONTEXT,
-                    new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
-                long snapshotLogOffset = active.beginWritingSnapshot().get();
-                reader = logEnv.waitForSnapshot(snapshotLogOffset);
-                SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(reader);
-                assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset());
-                checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot);
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                setBootstrapMetadata(SIMPLE_BOOTSTRAP).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            for (int i = 0; i < numBrokers; i++) {
+                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(i).
+                        setRack(null).
+                        setClusterId(active.clusterId()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
+                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
+                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
+                            setName("PLAINTEXT").setHost("localhost").
+                            setPort(9092 + i)).iterator()))).get();
+                brokerEpochs.put(i, reply.epoch());
             }
+            for (int i = 0; i < numBrokers - 1; i++) {
+                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
+                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
+                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
+            }
+            CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
+                new CreateTopicsRequestData().setTopics(
+                    new CreatableTopicCollection(Collections.singleton(
+                        new CreatableTopic().setName("foo").setNumPartitions(-1).
+                            setReplicationFactor((short) -1).
+                            setAssignments(new CreatableReplicaAssignmentCollection(
+                                Arrays.asList(new CreatableReplicaAssignment().
+                                    setPartitionIndex(0).
+                                    setBrokerIds(Arrays.asList(0, 1, 2)),
+                                new CreatableReplicaAssignment().
+                                    setPartitionIndex(1).
+                                    setBrokerIds(Arrays.asList(1, 2, 0))).
+                                        iterator()))).iterator())),
+                Collections.singleton("foo")).get();
+            fooId = fooData.topics().find("foo").topicId();
+            active.allocateProducerIds(ANONYMOUS_CONTEXT,
+                new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
+            long snapshotLogOffset = active.beginWritingSnapshot().get();
+            reader = logEnv.waitForSnapshot(snapshotLogOffset);
+            SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(reader);
+            assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset());
+            checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot);
         }
 
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(reader))) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })) {
-                QuorumController active = controlEnv.activeController();
-                long snapshotLogOffset = active.beginWritingSnapshot().get();
-                SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(
-                    logEnv.waitForSnapshot(snapshotLogOffset)
-                );
-                assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset());
-                checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot);
-            }
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                setSnapshotReader(reader).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            long snapshotLogOffset = active.beginWritingSnapshot().get();
+            SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(
+                logEnv.waitForSnapshot(snapshotLogOffset)
+            );
+            assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset());
+            checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot);
         }
     }
 
@@ -673,36 +702,110 @@ public class QuorumControllerTest {
         final int maxNewRecordBytes = 4;
         Map<Integer, Long> brokerEpochs = new HashMap<>();
         Uuid fooId;
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-                b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
-                b.setBootstrapMetadata(SIMPLE_BOOTSTRAP);
-            })) {
-                QuorumController active = controlEnv.activeController();
-                for (int i = 0; i < numBrokers; i++) {
-                    BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
-                        new BrokerRegistrationRequestData().
-                            setBrokerId(i).
-                            setRack(null).
-                            setClusterId(active.clusterId()).
-                            setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
-                            setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
-                            setListeners(new ListenerCollection(Arrays.asList(new Listener().
-                                setName("PLAINTEXT").setHost("localhost").
-                                setPort(9092 + i)).iterator()))).get();
-                    brokerEpochs.put(i, reply.epoch());
-                }
-                for (int i = 0; i < numBrokers - 1; i++) {
-                    assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                        active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                            setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
-                            setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
-                }
-                CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
-                    new CreateTopicsRequestData().setTopics(
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                    controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
+                    controllerBuilder.setBootstrapMetadata(SIMPLE_BOOTSTRAP);
+                }).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            for (int i = 0; i < numBrokers; i++) {
+                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(i).
+                        setRack(null).
+                        setClusterId(active.clusterId()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
+                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
+                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
+                            setName("PLAINTEXT").setHost("localhost").
+                            setPort(9092 + i)).iterator()))).get();
+                brokerEpochs.put(i, reply.epoch());
+            }
+            for (int i = 0; i < numBrokers - 1; i++) {
+                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
+                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
+                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
+            }
+            CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
+                new CreateTopicsRequestData().setTopics(
+                    new CreatableTopicCollection(Collections.singleton(
+                        new CreatableTopic().setName("foo").setNumPartitions(-1).
+                            setReplicationFactor((short) -1).
+                            setAssignments(new CreatableReplicaAssignmentCollection(
+                                Arrays.asList(new CreatableReplicaAssignment().
+                                    setPartitionIndex(0).
+                                    setBrokerIds(Arrays.asList(0, 1, 2)),
+                                new CreatableReplicaAssignment().
+                                    setPartitionIndex(1).
+                                    setBrokerIds(Arrays.asList(1, 2, 0))).
+                                        iterator()))).iterator())),
+                Collections.singleton("foo")).get();
+            fooId = fooData.topics().find("foo").topicId();
+            active.allocateProducerIds(ANONYMOUS_CONTEXT,
+                    new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
+
+            SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(logEnv.waitForLatestSnapshot());
+            checkSnapshotSubcontent(
+                expectedSnapshotContent(fooId, brokerEpochs),
+                snapshot
+            );
+        }
+    }
+
+    @Test
+    public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable {
+        final int numBrokers = 4;
+        final int maxNewRecordBytes = 1000;
+        Map<Integer, Long> brokerEpochs = new HashMap<>();
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                    controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
+                }).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            for (int i = 0; i < numBrokers; i++) {
+                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(i).
+                        setRack(null).
+                        setClusterId(active.clusterId()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
+                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
+                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
+                            setName("PLAINTEXT").setHost("localhost").
+                            setPort(9092 + i)).iterator()))).get();
+                brokerEpochs.put(i, reply.epoch());
+                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
+                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
+                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
+            }
+
+            assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
+                String.format("%s appended bytes is not less than %s max new record bytes",
+                    logEnv.appendedBytes(),
+                    maxNewRecordBytes));
+
+            // Keep creating topic until we reached the max bytes limit
+            int counter = 0;
+            while (logEnv.appendedBytes() < maxNewRecordBytes) {
+                counter += 1;
+                String topicName = String.format("foo-%s", counter);
+                active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(
                         new CreatableTopicCollection(Collections.singleton(
-                            new CreatableTopic().setName("foo").setNumPartitions(-1).
+                            new CreatableTopic().setName(topicName).setNumPartitions(-1).
                                 setReplicationFactor((short) -1).
                                 setAssignments(new CreatableReplicaAssignmentCollection(
                                     Arrays.asList(new CreatableReplicaAssignment().
@@ -712,75 +815,9 @@ public class QuorumControllerTest {
                                         setPartitionIndex(1).
                                         setBrokerIds(Arrays.asList(1, 2, 0))).
                                             iterator()))).iterator())),
-                    Collections.singleton("foo")).get();
-                fooId = fooData.topics().find("foo").topicId();
-                active.allocateProducerIds(ANONYMOUS_CONTEXT,
-                        new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
-
-                SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(logEnv.waitForLatestSnapshot());
-                checkSnapshotSubcontent(
-                    expectedSnapshotContent(fooId, brokerEpochs),
-                    snapshot
-                );
-            }
-        }
-    }
-
-    @Test
-    public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable {
-        final int numBrokers = 4;
-        final int maxNewRecordBytes = 1000;
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-                b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
-            })) {
-                QuorumController active = controlEnv.activeController();
-                for (int i = 0; i < numBrokers; i++) {
-                    BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
-                        new BrokerRegistrationRequestData().
-                            setBrokerId(i).
-                            setRack(null).
-                            setClusterId(active.clusterId()).
-                            setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
-                            setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
-                            setListeners(new ListenerCollection(Arrays.asList(new Listener().
-                                setName("PLAINTEXT").setHost("localhost").
-                                setPort(9092 + i)).iterator()))).get();
-                    brokerEpochs.put(i, reply.epoch());
-                    assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                        active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                            setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
-                            setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
-                }
-
-                assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
-                    String.format("%s appended bytes is not less than %s max new record bytes",
-                        logEnv.appendedBytes(),
-                        maxNewRecordBytes));
-
-                // Keep creating topic until we reached the max bytes limit
-                int counter = 0;
-                while (logEnv.appendedBytes() < maxNewRecordBytes) {
-                    counter += 1;
-                    String topicName = String.format("foo-%s", counter);
-                    active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(
-                            new CreatableTopicCollection(Collections.singleton(
-                                new CreatableTopic().setName(topicName).setNumPartitions(-1).
-                                    setReplicationFactor((short) -1).
-                                    setAssignments(new CreatableReplicaAssignmentCollection(
-                                        Arrays.asList(new CreatableReplicaAssignment().
-                                            setPartitionIndex(0).
-                                            setBrokerIds(Arrays.asList(0, 1, 2)),
-                                        new CreatableReplicaAssignment().
-                                            setPartitionIndex(1).
-                                            setBrokerIds(Arrays.asList(1, 2, 0))).
-                                                iterator()))).iterator())),
-                        Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
-                }
-                logEnv.waitForLatestSnapshot();
+                    Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
             }
+            logEnv.waitForLatestSnapshot();
         }
     }
 
@@ -912,52 +949,55 @@ public class QuorumControllerTest {
      */
     @Test
     public void testTimeouts() throws Throwable {
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })) {
-                QuorumController controller = controlEnv.activeController();
-                CountDownLatch countDownLatch = controller.pause();
-                long now = controller.time().nanoseconds();
-                ControllerRequestContext context0 = new ControllerRequestContext(
-                    new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
-                CompletableFuture<CreateTopicsResponseData> createFuture =
-                    controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).
-                        setTopics(new CreatableTopicCollection(Collections.singleton(
-                            new CreatableTopic().setName("foo")).iterator())),
-                        Collections.emptySet());
-                CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
-                    controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID));
-                CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
-                    controller.findTopicIds(context0, Collections.singletonList("foo"));
-                CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
-                    controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
-                CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
-                    controller.createPartitions(context0, Collections.singletonList(
-                        new CreatePartitionsTopic()), false);
-                CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
-                    controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).
-                        setTopicPartitions(null));
-                CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
-                    controller.alterPartitionReassignments(context0,
-                        new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).
-                            setTopics(Collections.singletonList(new ReassignableTopic())));
-                CompletableFuture<ListPartitionReassignmentsResponseData> listReassignmentsFuture =
-                    controller.listPartitionReassignments(context0,
-                        new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
-                while (controller.time().nanoseconds() == now) {
-                    Thread.sleep(0, 10);
-                }
-                countDownLatch.countDown();
-                assertYieldsTimeout(createFuture);
-                assertYieldsTimeout(deleteFuture);
-                assertYieldsTimeout(findTopicIdsFuture);
-                assertYieldsTimeout(findTopicNamesFuture);
-                assertYieldsTimeout(createPartitionsFuture);
-                assertYieldsTimeout(electLeadersFuture);
-                assertYieldsTimeout(alterReassignmentsFuture);
-                assertYieldsTimeout(listReassignmentsFuture);
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
+        ) {
+            QuorumController controller = controlEnv.activeController();
+            CountDownLatch countDownLatch = controller.pause();
+            long now = controller.time().nanoseconds();
+            ControllerRequestContext context0 = new ControllerRequestContext(
+                new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
+            CompletableFuture<CreateTopicsResponseData> createFuture =
+                controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).
+                    setTopics(new CreatableTopicCollection(Collections.singleton(
+                        new CreatableTopic().setName("foo")).iterator())),
+                    Collections.emptySet());
+            CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
+                controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID));
+            CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
+                controller.findTopicIds(context0, Collections.singletonList("foo"));
+            CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
+                controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
+            CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
+                controller.createPartitions(context0, Collections.singletonList(
+                    new CreatePartitionsTopic()), false);
+            CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
+                controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).
+                    setTopicPartitions(null));
+            CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
+                controller.alterPartitionReassignments(context0,
+                    new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).
+                        setTopics(Collections.singletonList(new ReassignableTopic())));
+            CompletableFuture<ListPartitionReassignmentsResponseData> listReassignmentsFuture =
+                controller.listPartitionReassignments(context0,
+                    new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
+            while (controller.time().nanoseconds() == now) {
+                Thread.sleep(0, 10);
             }
+            countDownLatch.countDown();
+            assertYieldsTimeout(createFuture);
+            assertYieldsTimeout(deleteFuture);
+            assertYieldsTimeout(findTopicIdsFuture);
+            assertYieldsTimeout(findTopicNamesFuture);
+            assertYieldsTimeout(createPartitionsFuture);
+            assertYieldsTimeout(electLeadersFuture);
+            assertYieldsTimeout(alterReassignmentsFuture);
+            assertYieldsTimeout(listReassignmentsFuture);
         }
     }
 
@@ -972,37 +1012,41 @@ public class QuorumControllerTest {
      */
     @Test
     public void testEarlyControllerResults() throws Throwable {
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })) {
-                QuorumController controller = controlEnv.activeController();
-                CountDownLatch countDownLatch = controller.pause();
-                CompletableFuture<CreateTopicsResponseData> createFuture =
-                    controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
-                        setTimeoutMs(120000), Collections.emptySet());
-                CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
-                    controller.deleteTopics(ANONYMOUS_CONTEXT, Collections.emptyList());
-                CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
-                    controller.findTopicIds(ANONYMOUS_CONTEXT, Collections.emptyList());
-                CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
-                    controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList());
-                CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
-                    controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList(), false);
-                CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
-                    controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
-                CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
-                    controller.alterPartitionReassignments(ANONYMOUS_CONTEXT,
-                        new AlterPartitionReassignmentsRequestData());
-                createFuture.get();
-                deleteFuture.get();
-                findTopicIdsFuture.get();
-                findTopicNamesFuture.get();
-                createPartitionsFuture.get();
-                electLeadersFuture.get();
-                alterReassignmentsFuture.get();
-                countDownLatch.countDown();
-            }
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
+        ) {
+            QuorumController controller = controlEnv.activeController();
+            CountDownLatch countDownLatch = controller.pause();
+            CompletableFuture<CreateTopicsResponseData> createFuture =
+                controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
+                    setTimeoutMs(120000), Collections.emptySet());
+            CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
+                controller.deleteTopics(ANONYMOUS_CONTEXT, Collections.emptyList());
+            CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
+                controller.findTopicIds(ANONYMOUS_CONTEXT, Collections.emptyList());
+            CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
+                controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList());
+            CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
+                controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList(), false);
+            CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
+                controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
+            CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
+                controller.alterPartitionReassignments(ANONYMOUS_CONTEXT,
+                    new AlterPartitionReassignmentsRequestData());
+            createFuture.get();
+            deleteFuture.get();
+            findTopicIdsFuture.get();
+            findTopicNamesFuture.get();
+            createPartitionsFuture.get();
+            electLeadersFuture.get();
+            alterReassignmentsFuture.get();
+            countDownLatch.countDown();
         }
     }
 
@@ -1013,10 +1057,15 @@ public class QuorumControllerTest {
         int numPartitions = 3;
         String topicName = "topic-name";
 
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })) {
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
+        ) {
             QuorumController controller = controlEnv.activeController();
 
             Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers);
@@ -1165,38 +1214,42 @@ public class QuorumControllerTest {
 
     @Test
     public void testConfigResourceExistenceChecker() throws Throwable {
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })) {
-                QuorumController active = controlEnv.activeController();
-                registerBrokers(active, 5);
-                active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
-                    setTopics(new CreatableTopicCollection(Collections.singleton(
-                        new CreatableTopic().setName("foo").
-                            setReplicationFactor((short) 3).
-                            setNumPartitions(1)).iterator())),
-                    Collections.singleton("foo")).get();
-                ConfigResourceExistenceChecker checker =
-                    active.new ConfigResourceExistenceChecker();
-                // A ConfigResource with type=BROKER and name=(empty string) represents
-                // the default broker resource. It is used to set cluster configs.
-                checker.accept(new ConfigResource(BROKER, ""));
-
-                // Broker 3 exists, so we can set a configuration for it.
-                checker.accept(new ConfigResource(BROKER, "3"));
-
-                // Broker 10 does not exist, so this should throw an exception.
-                assertThrows(BrokerIdNotRegisteredException.class,
-                    () -> checker.accept(new ConfigResource(BROKER, "10")));
-
-                // Topic foo exists, so we can set a configuration for it.
-                checker.accept(new ConfigResource(TOPIC, "foo"));
-
-                // Topic bar does not exist, so this should throw an exception.
-                assertThrows(UnknownTopicOrPartitionException.class,
-                    () -> checker.accept(new ConfigResource(TOPIC, "bar")));
-            }
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            registerBrokers(active, 5);
+            active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
+                setTopics(new CreatableTopicCollection(Collections.singleton(
+                    new CreatableTopic().setName("foo").
+                        setReplicationFactor((short) 3).
+                        setNumPartitions(1)).iterator())),
+                Collections.singleton("foo")).get();
+            ConfigResourceExistenceChecker checker =
+                active.new ConfigResourceExistenceChecker();
+            // A ConfigResource with type=BROKER and name=(empty string) represents
+            // the default broker resource. It is used to set cluster configs.
+            checker.accept(new ConfigResource(BROKER, ""));
+
+            // Broker 3 exists, so we can set a configuration for it.
+            checker.accept(new ConfigResource(BROKER, "3"));
+
+            // Broker 10 does not exist, so this should throw an exception.
+            assertThrows(BrokerIdNotRegisteredException.class,
+                () -> checker.accept(new ConfigResource(BROKER, "10")));
+
+            // Topic foo exists, so we can set a configuration for it.
+            checker.accept(new ConfigResource(TOPIC, "foo"));
+
+            // Topic bar does not exist, so this should throw an exception.
+            assertThrows(UnknownTopicOrPartitionException.class,
+                () -> checker.accept(new ConfigResource(TOPIC, "bar")));
         }
     }
 
@@ -1214,18 +1267,22 @@ public class QuorumControllerTest {
             authorizer.configure(Collections.emptyMap());
             authorizers.add(authorizer);
         }
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(
-            numControllers,
-            Optional.empty(),
-            shared -> {
-                shared.setInitialMaxReadOffset(2);
-            }
-        )) {
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(numControllers).
+                setSharedLogDataInitializer(sharedLogData -> {
+                    sharedLogData.setInitialMaxReadOffset(2);
+                }).
+                build()
+        ) {
             logEnv.appendInitialRecords(expectedSnapshotContent(FOO_ID, ALL_ZERO_BROKER_EPOCHS));
             logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2));
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setAuthorizer(authorizers.get(b.nodeId()));
-            })) {
+            try (
+                QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                    setControllerBuilderInitializer(controllerBuilder -> {
+                        controllerBuilder.setAuthorizer(authorizers.get(controllerBuilder.nodeId()));
+                    }).
+                    build()
+            ) {
                 assertInitialLoadFuturesNotComplete(authorizers);
                 logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE);
                 QuorumController active = controlEnv.activeController();
@@ -1241,25 +1298,27 @@ public class QuorumControllerTest {
 
     @Test
     public void testFatalMetadataReplayErrorOnActive() throws Throwable {
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-            })) {
-                QuorumController active = controlEnv.activeController();
-                CompletableFuture<Void> future = active.appendWriteEvent("errorEvent",
-                        OptionalLong.empty(), () -> {
-                            return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
-                                    new ConfigRecord().
-                                            setName(null).
-                                            setResourceName(null).
-                                            setResourceType((byte) 255).
-                                            setValue(null), (short) 0)), null);
-                        });
-                assertThrows(ExecutionException.class, () -> future.get());
-                assertEquals(NullPointerException.class,
-                        controlEnv.fatalFaultHandler().firstException().getCause().getClass());
-                controlEnv.fatalFaultHandler().setIgnore(true);
-                controlEnv.metadataFaultHandler().setIgnore(true);
-            }
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            CompletableFuture<Void> future = active.appendWriteEvent("errorEvent",
+                    OptionalLong.empty(), () -> {
+                        return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
+                                new ConfigRecord().
+                                        setName(null).
+                                        setResourceName(null).
+                                        setResourceType((byte) 255).
+                                        setValue(null), (short) 0)), null);
+                    });
+            assertThrows(ExecutionException.class, () -> future.get());
+            assertEquals(NullPointerException.class,
+                    controlEnv.fatalFaultHandler().firstException().getCause().getClass());
+            controlEnv.fatalFaultHandler().setIgnore(true);
+            controlEnv.metadataFaultHandler().setIgnore(true);
         }
     }
 
@@ -1323,60 +1382,68 @@ public class QuorumControllerTest {
 
     @Test
     public void testUpgradeFromPreProductionVersion() throws Exception {
-        try (InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS)) {
-            try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(
-                    FileRawSnapshotReader.open(initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0)))
-            )) {
-                try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                    b.setConfigSchema(SCHEMA);
-                }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) {
-                    QuorumController active = controlEnv.activeController();
-                    TestUtils.waitForCondition(() ->
-                        active.featureControl().metadataVersion().equals(MetadataVersion.IBP_3_0_IV1),
-                        "Failed to get a metadata version of " + MetadataVersion.IBP_3_0_IV1);
-                    // The ConfigRecord in our bootstrap should not have been applied, since there
-                    // were already records present.
-                    assertEquals(Collections.emptyMap(), active.configurationControl().
-                            getConfigs(new ConfigResource(BROKER, "")));
-                }
-            }
+        try (
+            InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS);
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                setSnapshotReader(FileRawSnapshotReader.open(
+                    initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0))).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                setBootstrapMetadata(COMPLEX_BOOTSTRAP).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            TestUtils.waitForCondition(() ->
+                active.featureControl().metadataVersion().equals(MetadataVersion.IBP_3_0_IV1),
+                "Failed to get a metadata version of " + MetadataVersion.IBP_3_0_IV1);
+            // The ConfigRecord in our bootstrap should not have been applied, since there
+            // were already records present.
+            assertEquals(Collections.emptyMap(), active.configurationControl().
+                    getConfigs(new ConfigResource(BROKER, "")));
         }
     }
 
     @Test
     public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                }).
+                setBootstrapMetadata(COMPLEX_BOOTSTRAP).
+                build();
         ) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) {
-                QuorumController active = controlEnv.activeController();
+            QuorumController active = controlEnv.activeController();
 
-                ControllerRequestContext ctx = new ControllerRequestContext(
-                    new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE));
+            ControllerRequestContext ctx = new ControllerRequestContext(
+                new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE));
 
-                TestUtils.waitForCondition(() -> {
-                    FinalizedControllerFeatures features = active.finalizedFeatures(ctx).get();
-                    Optional<Short> metadataVersionOpt = features.get(MetadataVersion.FEATURE_NAME);
-                    return Optional.of(MetadataVersion.IBP_3_3_IV1.featureLevel()).equals(metadataVersionOpt);
-                }, "Failed to see expected metadata version from bootstrap metadata");
+            TestUtils.waitForCondition(() -> {
+                FinalizedControllerFeatures features = active.finalizedFeatures(ctx).get();
+                Optional<Short> metadataVersionOpt = features.get(MetadataVersion.FEATURE_NAME);
+                return Optional.of(MetadataVersion.IBP_3_3_IV1.featureLevel()).equals(metadataVersionOpt);
+            }, "Failed to see expected metadata version from bootstrap metadata");
 
-                TestUtils.waitForCondition(() -> {
-                    ConfigResource defaultBrokerResource = new ConfigResource(BROKER, "");
+            TestUtils.waitForCondition(() -> {
+                ConfigResource defaultBrokerResource = new ConfigResource(BROKER, "");
 
-                    Map<ConfigResource, Collection<String>> configs = Collections.singletonMap(
-                        defaultBrokerResource,
-                        Collections.emptyList()
-                    );
+                Map<ConfigResource, Collection<String>> configs = Collections.singletonMap(
+                    defaultBrokerResource,
+                    Collections.emptyList()
+                );
 
-                    Map<ConfigResource, ResultOrError<Map<String, String>>> results =
-                        active.describeConfigs(ctx, configs).get();
+                Map<ConfigResource, ResultOrError<Map<String, String>>> results =
+                    active.describeConfigs(ctx, configs).get();
 
-                    ResultOrError<Map<String, String>> resultOrError = results.get(defaultBrokerResource);
-                    return resultOrError.isResult() &&
-                        Collections.singletonMap("foo", "bar").equals(resultOrError.result());
-                }, "Failed to see expected config change from bootstrap metadata");
-            }
+                ResultOrError<Map<String, String>> resultOrError = results.get(defaultBrokerResource);
+                return resultOrError.isResult() &&
+                    Collections.singletonMap("foo", "bar").equals(resultOrError.result());
+            }, "Failed to see expected config change from bootstrap metadata");
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index fd56ef6a644..993ba848aad 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.controller.QuorumController.Builder;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
@@ -44,28 +43,51 @@ public class QuorumControllerTestEnv implements AutoCloseable {
     private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
     private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
 
-    public QuorumControllerTestEnv(
-        LocalLogManagerTestEnv logEnv,
-        Consumer<QuorumController.Builder> builderConsumer
-    ) throws Exception {
-        this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(),
-                BootstrapMetadata.fromVersion(MetadataVersion.latest(), "test-provided version"));
-    }
+    public static class Builder {
+        private final LocalLogManagerTestEnv logEnv;
+        private Consumer<QuorumController.Builder> controllerBuilderInitializer = __ -> { };
+        private OptionalLong sessionTimeoutMillis = OptionalLong.empty();
+        private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
+        private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
+                fromVersion(MetadataVersion.latest(), "test-provided version");
 
-    public QuorumControllerTestEnv(
-            LocalLogManagerTestEnv logEnv,
-            Consumer<Builder> builderConsumer,
-            OptionalLong sessionTimeoutMillis,
-            OptionalLong leaderImbalanceCheckIntervalNs,
-            MetadataVersion metadataVersion
-    ) throws Exception {
-        this(logEnv, builderConsumer, sessionTimeoutMillis, leaderImbalanceCheckIntervalNs,
-                BootstrapMetadata.fromVersion(metadataVersion, "test-provided version"));
+        public Builder(LocalLogManagerTestEnv logEnv) {
+            this.logEnv = logEnv;
+        }
+
+        public Builder setControllerBuilderInitializer(Consumer<QuorumController.Builder> controllerBuilderInitializer) {
+            this.controllerBuilderInitializer = controllerBuilderInitializer;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMillis(OptionalLong sessionTimeoutMillis) {
+            this.sessionTimeoutMillis = sessionTimeoutMillis;
+            return this;
+        }
+
+        public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong leaderImbalanceCheckIntervalNs) {
+            this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
+            return this;
+        }
+
+        public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
+            this.bootstrapMetadata = bootstrapMetadata;
+            return this;
+        }
+
+        public QuorumControllerTestEnv build() throws Exception {
+            return new QuorumControllerTestEnv(
+                logEnv,
+                controllerBuilderInitializer,
+                sessionTimeoutMillis,
+                leaderImbalanceCheckIntervalNs,
+                bootstrapMetadata);
+        }
     }
 
-    public QuorumControllerTestEnv(
+    private QuorumControllerTestEnv(
         LocalLogManagerTestEnv logEnv,
-        Consumer<Builder> builderConsumer,
+        Consumer<QuorumController.Builder> controllerBuilderInitializer,
         OptionalLong sessionTimeoutMillis,
         OptionalLong leaderImbalanceCheckIntervalNs,
         BootstrapMetadata bootstrapMetadata
@@ -87,7 +109,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
                 });
                 builder.setFatalFaultHandler(fatalFaultHandler);
                 builder.setMetadataFaultHandler(metadataFaultHandler);
-                builderConsumer.accept(builder);
+                controllerBuilderInitializer.accept(builder);
                 this.controllers.add(builder.build());
             }
         } catch (Exception e) {
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index 7b5e26d79f6..066ca4e000d 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -26,7 +26,6 @@ import org.junit.jupiter.api.Timeout;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.stream.Collectors;
 
@@ -44,8 +43,10 @@ public class LocalLogManagerTest {
      */
     @Test
     public void testCreateAndClose() throws Exception {
-        try (LocalLogManagerTestEnv env =
-                 LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) {
+        try (
+            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).
+                buildWithMockListeners();
+        ) {
             env.close();
             assertEquals(null, env.firstError.get());
         }
@@ -56,8 +57,10 @@ public class LocalLogManagerTest {
      */
     @Test
     public void testClaimsLeadership() throws Exception {
-        try (LocalLogManagerTestEnv env =
-                 LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) {
+        try (
+            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).
+                    buildWithMockListeners();
+        ) {
             assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader());
             env.close();
             assertEquals(null, env.firstError.get());
@@ -69,8 +72,10 @@ public class LocalLogManagerTest {
      */
     @Test
     public void testPassLeadership() throws Exception {
-        try (LocalLogManagerTestEnv env =
-                 LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) {
+        try (
+            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).
+                    buildWithMockListeners();
+        ) {
             LeaderAndEpoch first = env.waitForLeader();
             LeaderAndEpoch cur = first;
             do {
@@ -123,8 +128,10 @@ public class LocalLogManagerTest {
      */
     @Test
     public void testCommits() throws Exception {
-        try (LocalLogManagerTestEnv env =
-                 LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) {
+        try (
+            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).
+                    buildWithMockListeners();
+        ) {
             LeaderAndEpoch leaderInfo = env.waitForLeader();
             int leaderId = leaderInfo.leaderId().orElseThrow(() ->
                 new AssertionError("Current leader is undefined")
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 17c9c467124..1693b62be1a 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -66,31 +66,59 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
      */
     private final List<LocalLogManager> logManagers;
 
-    public static LocalLogManagerTestEnv createWithMockListeners(
-        int numManagers,
-        Optional<RawSnapshotReader> snapshot
-    ) throws Exception {
-        LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers, snapshot);
-        try {
-            for (LocalLogManager logManager : testEnv.logManagers) {
-                logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt()));
+    public static class Builder {
+        private final int numManagers;
+        private Optional<RawSnapshotReader> snapshotReader = Optional.empty();
+        private Consumer<SharedLogData> sharedLogDataInitializer = __ -> { };
+
+        public Builder(int numManagers) {
+            this.numManagers = numManagers;
+        }
+
+        public Builder setSnapshotReader(RawSnapshotReader snapshotReader) {
+            this.snapshotReader = Optional.of(snapshotReader);
+            return this;
+        }
+
+        public Builder setSharedLogDataInitializer(Consumer<SharedLogData> sharedLogDataInitializer) {
+            this.sharedLogDataInitializer = sharedLogDataInitializer;
+            return this;
+        }
+
+        public LocalLogManagerTestEnv build() {
+            return new LocalLogManagerTestEnv(
+                numManagers,
+                snapshotReader,
+                sharedLogDataInitializer);
+        }
+
+        public LocalLogManagerTestEnv buildWithMockListeners() {
+            LocalLogManagerTestEnv env = build();
+            try {
+                for (LocalLogManager logManager : env.logManagers) {
+                    logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt()));
+                }
+            } catch (Exception e) {
+                try {
+                    env.close();
+                } catch (Exception t) {
+                    log.error("Error while closing new log environment", t);
+                }
+                throw e;
             }
-        } catch (Exception e) {
-            testEnv.close();
-            throw e;
+            return env;
         }
-        return testEnv;
     }
 
-    public LocalLogManagerTestEnv(
+    private LocalLogManagerTestEnv(
         int numManagers,
-        Optional<RawSnapshotReader> snapshot,
-        Consumer<SharedLogData> dataSetup
-    ) throws Exception {
+        Optional<RawSnapshotReader> snapshotReader,
+        Consumer<SharedLogData> sharedLogDataInitializer
+    ) {
         clusterId = Uuid.randomUuid().toString();
         dir = TestUtils.tempDirectory();
-        shared = new SharedLogData(snapshot);
-        dataSetup.accept(shared);
+        shared = new SharedLogData(snapshotReader);
+        sharedLogDataInitializer.accept(shared);
         List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
         try {
             for (int nodeId = 0; nodeId < numManagers; nodeId++) {
@@ -112,13 +140,6 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
         this.logManagers = newLogManagers;
     }
 
-    public LocalLogManagerTestEnv(
-        int numManagers,
-        Optional<RawSnapshotReader> snapshot
-    ) throws Exception {
-        this(numManagers, snapshot, __ -> { });
-    }
-
     /**
      * Append some records to the log. This method is meant to be called before the
      * controllers are started, to simulate a pre-existing metadata log.