You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/05/17 23:51:37 UTC

[kafka] branch trunk updated: KAFKA-12788: improve KRaft replica placement (#10494)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e5b77f  KAFKA-12788: improve KRaft replica placement (#10494)
9e5b77f is described below

commit 9e5b77fb9687c319dd2b188d9669ca2bd01e9bb8
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Mon May 17 16:49:47 2021 -0700

    KAFKA-12788: improve KRaft replica placement (#10494)
    
    Implement a striped replica placement algorithm for KRaft. This also
    means implementing rack awareness.  Previously, KRraft just chose
    replicas randomly in a non-rack-aware fashion.  Also, allow replicas to
    be placed on fenced brokers if there are no other choices.  This was
    specified in KIP-631 but previously not implemented.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 checkstyle/import-control.xml                      |   1 +
 .../kafka/controller/BrokerHeartbeatManager.java   |  18 +-
 .../kafka/controller/ClusterControlManager.java    |  10 +-
 .../apache/kafka/controller/QuorumController.java  |  13 +-
 ...licaPlacementPolicy.java => ReplicaPlacer.java} |  10 +-
 .../controller/ReplicationControlManager.java      |   2 +-
 .../controller/SimpleReplicaPlacementPolicy.java   |  78 ----
 .../kafka/controller/StripedReplicaPlacer.java     | 429 +++++++++++++++++++++
 ...leBroker.java => OptionalStringComparator.java} |  49 +--
 .../org/apache/kafka/metadata/UsableBroker.java    |  15 +-
 .../controller/BrokerHeartbeatManagerTest.java     |  14 +-
 .../controller/ClusterControlManagerTest.java      |   8 +-
 .../kafka/controller/QuorumControllerTest.java     |   4 +-
 .../controller/ReplicationControlManagerTest.java  |  13 +-
 .../kafka/controller/StripedReplicaPlacerTest.java | 213 ++++++++++
 .../metadata/OptionalStringComparatorTest.java}    |  51 +--
 .../kafkatest/tests/core/round_trip_fault_test.py  |   5 +
 17 files changed, 738 insertions(+), 195 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c98cfab..7cf51eb 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -233,6 +233,7 @@
     <allow pkg="org.apache.kafka.common.message" />
     <allow pkg="org.apache.kafka.common.metadata" />
     <allow pkg="org.apache.kafka.common.protocol" />
+    <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.server.common" />
     <allow pkg="org.apache.kafka.test" />
   </subpackage>
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index d78ac88..f5ed4a2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.metadata.UsableBroker;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -272,6 +273,11 @@ public class BrokerHeartbeatManager {
         return unfenced;
     }
 
+    // VisibleForTesting
+    Collection<BrokerHeartbeatState> brokers() {
+        return brokers.values();
+    }
+
     /**
      * Mark a broker as fenced.
      *
@@ -439,7 +445,7 @@ public class BrokerHeartbeatManager {
      * @param numPartitions     The number of partitions to place.
      * @param numReplicas       The number of replicas for each partition.
      * @param idToRack          A function mapping broker id to broker rack.
-     * @param policy            The replica placement policy to use.
+     * @param placer            The replica placer to use.
      *
      * @return                  A list of replica lists.
      *
@@ -449,12 +455,10 @@ public class BrokerHeartbeatManager {
                                       int numPartitions,
                                       short numReplicas,
                                       Function<Integer, Optional<String>> idToRack,
-                                      ReplicaPlacementPolicy policy) {
-        // TODO: support using fenced brokers here if necessary to get to the desired
-        // number of replicas. We probably need to add a fenced boolean in UsableBroker.
+                                      ReplicaPlacer placer) {
         Iterator<UsableBroker> iterator = new UsableBrokerIterator(
-            unfenced.iterator(), idToRack);
-        return policy.createPlacement(startPartition, numPartitions, numReplicas, iterator);
+            brokers.values().iterator(), idToRack);
+        return placer.place(startPartition, numPartitions, numReplicas, iterator);
     }
 
     static class UsableBrokerIterator implements Iterator<UsableBroker> {
@@ -482,7 +486,7 @@ public class BrokerHeartbeatManager {
                 result = iterator.next();
             } while (result.shuttingDown());
             Optional<String> rack = idToRack.apply(result.id());
-            next = new UsableBroker(result.id(), rack);
+            next = new UsableBroker(result.id(), rack, result.fenced());
             return true;
         }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 6d34024..db63943 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -103,9 +103,9 @@ public class ClusterControlManager {
     private final long sessionTimeoutNs;
 
     /**
-     * The replica placement policy to use.
+     * The replica placer to use.
      */
-    private final ReplicaPlacementPolicy placementPolicy;
+    private final ReplicaPlacer replicaPlacer;
 
     /**
      * Maps broker IDs to broker registrations.
@@ -127,12 +127,12 @@ public class ClusterControlManager {
                           Time time,
                           SnapshotRegistry snapshotRegistry,
                           long sessionTimeoutNs,
-                          ReplicaPlacementPolicy placementPolicy) {
+                          ReplicaPlacer replicaPlacer) {
         this.logContext = logContext;
         this.log = logContext.logger(ClusterControlManager.class);
         this.time = time;
         this.sessionTimeoutNs = sessionTimeoutNs;
-        this.placementPolicy = placementPolicy;
+        this.replicaPlacer = replicaPlacer;
         this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
         this.heartbeatManager = null;
         this.readyBrokersFuture = Optional.empty();
@@ -317,7 +317,7 @@ public class ClusterControlManager {
             throw new RuntimeException("ClusterControlManager is not active.");
         }
         return heartbeatManager.placeReplicas(startPartition, numPartitions, numReplicas,
-            id -> brokerRegistrations.get(id).rack(), placementPolicy);
+            id -> brokerRegistrations.get(id).rack(), replicaPlacer);
     }
 
     public boolean unfenced(int brokerId) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 5cb5efc..94cced9 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -124,8 +124,7 @@ public final class QuorumController implements Controller {
         private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
         private short defaultReplicationFactor = 3;
         private int defaultNumPartitions = 1;
-        private ReplicaPlacementPolicy replicaPlacementPolicy =
-            new SimpleReplicaPlacementPolicy(new Random());
+        private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
         private Function<Long, SnapshotWriter> snapshotWriterBuilder;
         private SnapshotReader snapshotReader;
         private long sessionTimeoutNs = NANOSECONDS.convert(18, TimeUnit.SECONDS);
@@ -175,8 +174,8 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setReplicaPlacementPolicy(ReplicaPlacementPolicy replicaPlacementPolicy) {
-            this.replicaPlacementPolicy = replicaPlacementPolicy;
+        public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
+            this.replicaPlacer = replicaPlacer;
             return this;
         }
 
@@ -226,7 +225,7 @@ public final class QuorumController implements Controller {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
                 return new QuorumController(logContext, nodeId, queue, time, configDefs,
                     logManager, supportedFeatures, defaultReplicationFactor,
-                    defaultNumPartitions, replicaPlacementPolicy, snapshotWriterBuilder,
+                    defaultNumPartitions, replicaPlacer, snapshotWriterBuilder,
                     snapshotReader, sessionTimeoutNs, controllerMetrics);
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
@@ -913,7 +912,7 @@ public final class QuorumController implements Controller {
                              Map<String, VersionRange> supportedFeatures,
                              short defaultReplicationFactor,
                              int defaultNumPartitions,
-                             ReplicaPlacementPolicy replicaPlacementPolicy,
+                             ReplicaPlacer replicaPlacer,
                              Function<Long, SnapshotWriter> snapshotWriterBuilder,
                              SnapshotReader snapshotReader,
                              long sessionTimeoutNs,
@@ -930,7 +929,7 @@ public final class QuorumController implements Controller {
             snapshotRegistry, configDefs);
         this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
         this.clusterControl = new ClusterControlManager(logContext, time,
-            snapshotRegistry, sessionTimeoutNs, replicaPlacementPolicy);
+            snapshotRegistry, sessionTimeoutNs, replicaPlacer);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
         this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
         this.replicationControl = new ReplicationControlManager(snapshotRegistry,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacementPolicy.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java
similarity index 86%
rename from metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacementPolicy.java
rename to metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java
index 097463f..9a705f4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacementPolicy.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java
@@ -28,7 +28,7 @@ import org.apache.kafka.metadata.UsableBroker;
  * The interface which a Kafka replica placement policy must implement.
  */
 @InterfaceStability.Unstable
-interface ReplicaPlacementPolicy {
+interface ReplicaPlacer {
     /**
      * Create a new replica placement.
      *
@@ -42,9 +42,9 @@ interface ReplicaPlacementPolicy {
      *
      * @throws InvalidReplicationFactorException    If too many replicas were requested.
      */
-    List<List<Integer>> createPlacement(int startPartition,
-                                        int numPartitions,
-                                        short numReplicas,
-                                        Iterator<UsableBroker> iterator)
+    List<List<Integer>> place(int startPartition,
+                              int numPartitions,
+                              short numReplicas,
+                              Iterator<UsableBroker> iterator)
         throws InvalidReplicationFactorException;
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index a68cc22..3820642 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -529,7 +529,7 @@ public class ReplicationControlManager {
             } catch (InvalidReplicationFactorException e) {
                 return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                     "Unable to replicate the partition " + replicationFactor +
-                        " times: " + e.getMessage());
+                        " time(s): " + e.getMessage());
             }
         }
         Uuid topicId = Uuid.randomUuid();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/SimpleReplicaPlacementPolicy.java b/metadata/src/main/java/org/apache/kafka/controller/SimpleReplicaPlacementPolicy.java
deleted file mode 100644
index a2f7c89..0000000
--- a/metadata/src/main/java/org/apache/kafka/controller/SimpleReplicaPlacementPolicy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.kafka.common.errors.InvalidReplicationFactorException;
-import org.apache.kafka.metadata.UsableBroker;
-
-
-/**
- * A simple uniformly random placement policy.
- *
- * TODO: implement the current "striped" placement policy, plus rack aware placement
- * policies, etc.
- */
-public class SimpleReplicaPlacementPolicy implements ReplicaPlacementPolicy {
-    private final Random random;
-
-    public SimpleReplicaPlacementPolicy(Random random) {
-        this.random = random;
-    }
-
-    @Override
-    public List<List<Integer>> createPlacement(int startPartition,
-                                               int numPartitions,
-                                               short numReplicas,
-                                               Iterator<UsableBroker> iterator) {
-        List<UsableBroker> usable = new ArrayList<>();
-        while (iterator.hasNext()) {
-            usable.add(iterator.next());
-        }
-        if (usable.size() < numReplicas) {
-            throw new InvalidReplicationFactorException("there are only " + usable.size() +
-                " usable brokers");
-        }
-        List<List<Integer>> results = new ArrayList<>();
-        for (int p = 0; p < numPartitions; p++) {
-            List<Integer> choices = new ArrayList<>();
-            // TODO: rack-awareness
-            List<Integer> indexes = new ArrayList<>();
-            int initialIndex = random.nextInt(usable.size());
-            for (int i = 0; i < numReplicas; i++) {
-                indexes.add((initialIndex + i) % usable.size());
-            }
-            indexes.sort(Integer::compareTo);
-            Iterator<UsableBroker> iter = usable.iterator();
-            for (int i = 0; choices.size() < indexes.size(); i++) {
-                int brokerId = iter.next().id();
-                if (indexes.get(choices.size()) == i) {
-                    choices.add(brokerId);
-                }
-            }
-            Collections.shuffle(choices, random);
-            results.add(choices);
-        }
-        return results;
-    }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
new file mode 100644
index 0000000..a2aadc5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple case where
+ * broker racks have not been configured, this goal is a no-op, of course.  But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we are placing
+ * multiple partitions, we try to avoid putting each partition on the same set of
+ * replicas, even if it does satisfy the rack placement goal.  If any specific broker is
+ * fenced, we would like the new leaders to distributed evenly across the remaining
+ * brokers.
+ *
+ * However, we treat the rack placement goal as higher priority than this goal-- if you
+ * configure 10 brokers in rack A and B, and 1 broker in rack C, you will end up with a
+ * lot of partitions on that one broker in rack C.  If you were to place a lot of
+ * partitions with replication factor 3, each partition would try to get a replica there.
+ * In general racks are supposed to be about the same size -- if they aren't, this is a
+ * user error.
+ *
+ * Finally, we would prefer to place replicas on unfenced brokers, rather than on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot be
+ * satisfied.  The first constraint is that we can't place more than one replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to create
+ * new topics even if every broker were fenced.  However, this would be confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with replication
+ * factor 3, our first replica might come from A, our second from B, and our third from C.
+ * Of course our placement would not be very fair if we always started with rack A.
+ * Therefore, we generate a random starting offset when the RackList is created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on.
+ * Just like with the racks, we add a random starting offset to mix things up a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes available,
+ * we'd prefer not to place a new partition on a node that is fenced.  Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+
+        /**
+         * How many brokers we have retrieved from the list during the current iteration epoch.
+         */
+        private int index = 0;
+
+        /**
+         * The offset to add to the index in order to calculate the list entry to fetch.  The
+         * addition is done modulo the list size.
+         */
+        private int offset = 0;
+
+        /**
+         * The last known iteration epoch. If we call next with a different epoch than this, the
+         * index and offset will be reset.
+         */
+        private int epoch = 0;
+
+        BrokerList add(int broker) {
+            this.brokers.add(broker);
+            return this;
+        }
+
+        /**
+         * Initialize this broker list by sorting it and randomizing the start offset.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            if (!brokers.isEmpty()) {
+                brokers.sort(Integer::compareTo);
+                this.offset = random.nextInt(brokers.size());
+            }
+        }
+
+        /**
+         * Randomly shuffle the brokers in this list.
+         */
+        void shuffle(Random random) {
+            Collections.shuffle(brokers, random);
+        }
+
+        /**
+         * @return          The number of brokers in this list.
+         */
+        int size() {
+            return brokers.size();
+        }
+
+        /**
+         * Get the next broker in this list, or -1 if there are no more elements to be
+         * returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers to be
+         *                  returned in this epoch.
+         */
+        int next(int epoch) {
+            if (brokers.size() == 0) return -1;
+            if (this.epoch != epoch) {
+                this.epoch = epoch;
+                this.index = 0;
+                this.offset = (offset + 1) % brokers.size();
+            }
+            if (index >= brokers.size()) return -1;
+            int broker = brokers.get((index + offset) % brokers.size());
+            index++;
+            return broker;
+        }
+    }
+
+    /**
+     * A rack in the cluster, which contains brokers.
+     */
+    static class Rack {
+        private final BrokerList fenced = new BrokerList();
+        private final BrokerList unfenced = new BrokerList();
+
+        /**
+         * Initialize this rack.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            fenced.initialize(random);
+            unfenced.initialize(random);
+        }
+
+        void shuffle(Random random) {
+            fenced.shuffle(random);
+            unfenced.shuffle(random);
+        }
+
+        BrokerList fenced() {
+            return fenced;
+        }
+
+        BrokerList unfenced() {
+            return unfenced;
+        }
+
+        /**
+         * Get the next unfenced broker in this rack, or -1 if there are no more brokers
+         * to be returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers to be
+         *                  returned in this epoch.
+         */
+        int nextUnfenced(int epoch) {
+            return unfenced.next(epoch);
+        }
+
+        /**
+         * Get the next broker in this rack, or -1 if there are no more brokers to be
+         * returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers to be
+         *                  returned in this epoch.
+         */
+        int next(int epoch) {
+            int result = unfenced.next(epoch);
+            if (result >= 0) return result;
+            return fenced.next(epoch);
+        }
+    }
+
+    /**
+     * A list of racks that we can iterate through.
+     */
+    static class RackList {
+        /**
+         * The random number generator.
+         */
+        private final Random random;
+
+        /**
+         * A map from rack names to the brokers contained within them.
+         */
+        private final Map<Optional<String>, Rack> racks = new HashMap<>();
+
+        /**
+         * The names of all the racks in the cluster.
+         *
+         * Racks which have at least one unfenced broker come first (in sorted order),
+         * followed by racks which have only fenced brokers (also in sorted order).
+         */
+        private final List<Optional<String>> rackNames = new ArrayList<>();
+
+        /**
+         * The total number of brokers in the cluster, both fenced and unfenced.
+         */
+        private final int numTotalBrokers;
+
+        /**
+         * The total number of unfenced brokers in the cluster.
+         */
+        private final int numUnfencedBrokers;
+
+        /**
+         * The iteration epoch.
+         */
+        private int epoch = 0;
+
+        /**
+         * The offset we use to determine which rack is returned first.
+         */
+        private int offset;
+
+        RackList(Random random, Iterator<UsableBroker> iterator) {
+            this.random = random;
+            int numTotalBrokersCount = 0, numUnfencedBrokersCount = 0;
+            while (iterator.hasNext()) {
+                UsableBroker broker = iterator.next();
+                Rack rack = racks.get(broker.rack());
+                if (rack == null) {
+                    rackNames.add(broker.rack());
+                    rack = new Rack();
+                    racks.put(broker.rack(), rack);
+                }
+                if (broker.fenced()) {
+                    rack.fenced().add(broker.id());
+                } else {
+                    numUnfencedBrokersCount++;
+                    rack.unfenced().add(broker.id());
+                }
+                numTotalBrokersCount++;
+            }
+            for (Rack rack : racks.values()) {
+                rack.initialize(random);
+            }
+            this.rackNames.sort(OptionalStringComparator.INSTANCE);
+            this.numTotalBrokers = numTotalBrokersCount;
+            this.numUnfencedBrokers = numUnfencedBrokersCount;
+            this.offset = rackNames.isEmpty() ? 0 : random.nextInt(rackNames.size());
+        }
+
+        int numTotalBrokers() {
+            return numTotalBrokers;
+        }
+
+        int numUnfencedBrokers() {
+            return numUnfencedBrokers;
+        }
+
+        // VisibleForTesting
+        List<Optional<String>> rackNames() {
+            return rackNames;
+        }
+
+        List<Integer> place(int replicationFactor) {
+            if (replicationFactor <= 0) {
+                throw new InvalidReplicationFactorException("Invalid replication factor " +
+                        replicationFactor + ": the replication factor must be positive.");
+            }
+            // If we have returned as many assignments as there are unfenced brokers in
+            // the cluster, shuffle the rack list and broker lists to try to avoid
+            // repeating the same assignments again.
+            if (epoch == numUnfencedBrokers) {
+                shuffle();
+                epoch = 0;
+            }
+            if (offset == rackNames.size()) {
+                offset = 0;
+            }
+            List<Integer> brokers = new ArrayList<>(replicationFactor);
+            int firstRackIndex = offset;
+            while (true) {
+                Optional<String> name = rackNames.get(firstRackIndex);
+                Rack rack = racks.get(name);
+                int result = rack.nextUnfenced(epoch);
+                if (result >= 0) {
+                    brokers.add(result);
+                    break;
+                }
+                firstRackIndex++;
+                if (firstRackIndex == rackNames.size()) {
+                    firstRackIndex = 0;
+                }
+            }
+            int rackIndex = offset;
+            for (int replica = 1; replica < replicationFactor; replica++) {
+                int result = -1;
+                do {
+                    if (rackIndex == firstRackIndex) {
+                        firstRackIndex = -1;
+                    } else {
+                        Optional<String> rackName = rackNames.get(rackIndex);
+                        Rack rack = racks.get(rackName);
+                        result = rack.next(epoch);
+                    }
+                    rackIndex++;
+                    if (rackIndex == rackNames.size()) {
+                        rackIndex = 0;
+                    }
+                } while (result < 0);
+                brokers.add(result);
+            }
+            epoch++;
+            offset++;
+            return brokers;
+        }
+
+        void shuffle() {
+            Collections.shuffle(rackNames, random);
+            for (Rack rack : racks.values()) {
+                rack.shuffle(random);
+            }
+        }
+    }
+
+    private final Random random;
+
+    public StripedReplicaPlacer(Random random) {
+        this.random = random;
+    }
+
+    @Override
+    public List<List<Integer>> place(int startPartition,
+                                     int numPartitions,
+                                     short replicationFactor,
+                                     Iterator<UsableBroker> iterator) {
+        RackList rackList = new RackList(random, iterator);
+        if (rackList.numUnfencedBrokers() == 0) {
+            throw new InvalidReplicationFactorException("All brokers are currently fenced.");
+        }
+        if (replicationFactor > rackList.numTotalBrokers()) {
+            throw new InvalidReplicationFactorException("The target replication factor " +
+                "of " + replicationFactor + " cannot be reached because only " +
+                rackList.numTotalBrokers() + " broker(s) are registered.");
+        }
+        List<List<Integer>> placements = new ArrayList<>(numPartitions);
+        for (int partition = 0; partition < numPartitions; partition++) {
+            placements.add(rackList.place(replicationFactor));
+        }
+        return placements;
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java b/metadata/src/main/java/org/apache/kafka/metadata/OptionalStringComparator.java
similarity index 54%
copy from metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
copy to metadata/src/main/java/org/apache/kafka/metadata/OptionalStringComparator.java
index 9a2db10..3f20507 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/OptionalStringComparator.java
@@ -17,45 +17,24 @@
 
 package org.apache.kafka.metadata;
 
-import java.util.Objects;
+import java.util.Comparator;
 import java.util.Optional;
 
 
-/**
- * A broker where a replica can be placed.
- */
-public class UsableBroker {
-    private final int id;
-
-    private final Optional<String> rack;
-
-    public UsableBroker(int id, Optional<String> rack) {
-        this.id = id;
-        this.rack = rack;
-    }
-
-    public int id() {
-        return id;
-    }
-
-    public Optional<String> rack() {
-        return rack;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof UsableBroker)) return false;
-        UsableBroker other = (UsableBroker) o;
-        return other.id == id && other.rack.equals(rack);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, rack);
-    }
+public class OptionalStringComparator implements Comparator<Optional<String>> {
+    public static final OptionalStringComparator INSTANCE = new OptionalStringComparator();
 
     @Override
-    public String toString() {
-        return "UsableBroker(id=" + id + ", rack=" + rack + ")";
+    public int compare(Optional<String> a, Optional<String> b) {
+        if (!a.isPresent()) {
+            if (!b.isPresent()) {
+                return 0;
+            } else {
+                return -1;
+            }
+        } else if (!b.isPresent()) {
+            return 1;
+        }
+        return a.get().compareTo(b.get());
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java b/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
index 9a2db10..9c04ebd 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
@@ -29,9 +29,12 @@ public class UsableBroker {
 
     private final Optional<String> rack;
 
-    public UsableBroker(int id, Optional<String> rack) {
+    private final boolean fenced;
+
+    public UsableBroker(int id, Optional<String> rack, boolean fenced) {
         this.id = id;
         this.rack = rack;
+        this.fenced = fenced;
     }
 
     public int id() {
@@ -42,20 +45,24 @@ public class UsableBroker {
         return rack;
     }
 
+    public boolean fenced() {
+        return fenced;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof UsableBroker)) return false;
         UsableBroker other = (UsableBroker) o;
-        return other.id == id && other.rack.equals(rack);
+        return other.id == id && other.rack.equals(rack) && other.fenced == fenced;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, rack);
+        return Objects.hash(id, rack, fenced);
     }
 
     @Override
     public String toString() {
-        return "UsableBroker(id=" + id + ", rack=" + rack + ")";
+        return "UsableBroker(id=" + id + ", rack=" + rack + ", fenced=" + fenced + ")";
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index d70cc5c..6c8ef7e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -174,7 +174,7 @@ public class BrokerHeartbeatManagerTest {
     private static Set<UsableBroker> usableBrokersToSet(BrokerHeartbeatManager manager) {
         Set<UsableBroker> brokers = new HashSet<>();
         for (Iterator<UsableBroker> iterator = new UsableBrokerIterator(
-            manager.unfenced().iterator(),
+            manager.brokers().iterator(),
             id -> id % 2 == 0 ? Optional.of("rack1") : Optional.of("rack2"));
              iterator.hasNext(); ) {
             brokers.add(iterator.next());
@@ -193,10 +193,11 @@ public class BrokerHeartbeatManagerTest {
         manager.touch(4, true, 100);
         assertEquals(98L, manager.lowestActiveOffset());
         Set<UsableBroker> expected = new HashSet<>();
-        expected.add(new UsableBroker(0, Optional.of("rack1")));
-        expected.add(new UsableBroker(1, Optional.of("rack2")));
-        expected.add(new UsableBroker(2, Optional.of("rack1")));
-        expected.add(new UsableBroker(3, Optional.of("rack2")));
+        expected.add(new UsableBroker(0, Optional.of("rack1"), false));
+        expected.add(new UsableBroker(1, Optional.of("rack2"), false));
+        expected.add(new UsableBroker(2, Optional.of("rack1"), false));
+        expected.add(new UsableBroker(3, Optional.of("rack2"), false));
+        expected.add(new UsableBroker(4, Optional.of("rack1"), true));
         assertEquals(expected, usableBrokersToSet(manager));
         manager.updateControlledShutdownOffset(2, 0);
         assertEquals(100L, manager.lowestActiveOffset());
@@ -204,7 +205,8 @@ public class BrokerHeartbeatManagerTest {
             () -> manager.updateControlledShutdownOffset(4, 0));
         manager.touch(4, false, 100);
         manager.updateControlledShutdownOffset(4, 0);
-        expected.remove(new UsableBroker(2, Optional.of("rack1")));
+        expected.remove(new UsableBroker(2, Optional.of("rack1"), false));
+        expected.remove(new UsableBroker(4, Optional.of("rack1"), true));
         assertEquals(expected, usableBrokersToSet(manager));
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 04fc816..e5f321a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -57,7 +57,7 @@ public class ClusterControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), time, snapshotRegistry, 1000,
-                new SimpleReplicaPlacementPolicy(new Random()));
+                new StripedReplicaPlacer(new Random()));
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
 
@@ -98,7 +98,7 @@ public class ClusterControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000,
-            new SimpleReplicaPlacementPolicy(new Random()));
+            new StripedReplicaPlacer(new Random()));
         clusterControl.activate();
         clusterControl.replay(brokerRecord);
         assertEquals(new BrokerRegistration(1, 100,
@@ -121,7 +121,7 @@ public class ClusterControlManagerTest {
         MockRandom random = new MockRandom();
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), time, snapshotRegistry, 1000,
-            new SimpleReplicaPlacementPolicy(random));
+            new StripedReplicaPlacer(random));
         clusterControl.activate();
         for (int i = 0; i < numUsableBrokers; i++) {
             RegisterBrokerRecord brokerRecord =
@@ -158,7 +158,7 @@ public class ClusterControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), time, snapshotRegistry, 1000,
-            new SimpleReplicaPlacementPolicy(new Random()));
+            new StripedReplicaPlacer(new Random()));
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
         for (int i = 0; i < 3; i++) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 5e5ca01..83ab5f9 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -179,9 +179,11 @@ public class QuorumControllerTest {
                         new CreatableTopicCollection(Collections.singleton(
                             new CreatableTopic().setName("foo").setNumPartitions(1).
                                 setReplicationFactor((short) 1)).iterator()));
-                // TODO: place on a fenced broker if we have no choice
                 assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics(
                     createTopicsRequestData).get().topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition 1 time(s): All brokers " +
+                    "are currently fenced.", active.createTopics(
+                    createTopicsRequestData).get().topics().find("foo").errorMessage());
                 assertEquals(new BrokerHeartbeatReply(true, false, false, false),
                     active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().
                             setWantFence(false).setBrokerEpoch(0L).setBrokerId(0).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 934ca7a..0e6d3a5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -87,7 +87,7 @@ public class ReplicationControlManagerTest {
         final MockRandom random = new MockRandom();
         final ClusterControlManager clusterControl = new ClusterControlManager(
             logContext, time, snapshotRegistry, 1000,
-            new SimpleReplicaPlacementPolicy(random));
+            new StripedReplicaPlacer(random));
         final ControllerMetrics metrics = new MockControllerMetrics();
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
             new LogContext(), snapshotRegistry, Collections.emptyMap());
@@ -164,7 +164,8 @@ public class ReplicationControlManagerTest {
         CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
         expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
             setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).
-                setErrorMessage("Unable to replicate the partition 3 times: there are only 0 usable brokers"));
+                setErrorMessage("Unable to replicate the partition 3 time(s): All " +
+                    "brokers are currently fenced."));
         assertEquals(expectedResponse, result.response());
 
         registerBroker(0, ctx);
@@ -182,8 +183,8 @@ public class ReplicationControlManagerTest {
             setTopicId(result2.response().topics().find("foo").topicId()));
         assertEquals(expectedResponse2, result2.response());
         ctx.replay(result2.records());
-        assertEquals(new PartitionControlInfo(new int[] {2, 0, 1},
-            new int[] {2, 0, 1}, null, null, 2, 0, 0),
+        assertEquals(new PartitionControlInfo(new int[] {1, 2, 0},
+            new int[] {1, 2, 0}, null, null, 1, 0, 0),
             replicationControl.getPartition(
                 ((TopicRecord) result2.records().get(0).message()).topicId(), 0));
         ControllerResult<CreateTopicsResponseData> result3 =
@@ -197,8 +198,8 @@ public class ReplicationControlManagerTest {
         ControllerTestUtils.assertBatchIteratorContains(Arrays.asList(
             Arrays.asList(new ApiMessageAndVersion(new PartitionRecord().
                     setPartitionId(0).setTopicId(fooId).
-                    setReplicas(Arrays.asList(2, 0, 1)).setIsr(Arrays.asList(2, 0, 1)).
-                    setRemovingReplicas(null).setAddingReplicas(null).setLeader(2).
+                    setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).
+                    setRemovingReplicas(null).setAddingReplicas(null).setLeader(1).
                     setLeaderEpoch(0).setPartitionEpoch(0), (short) 0),
                 new ApiMessageAndVersion(new TopicRecord().
                     setTopicId(fooId).setName("foo"), (short) 0))),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
new file mode 100644
index 0000000..667e900
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+    /**
+     * Test that the BrokerList class works as expected.
+     */
+    @Test
+    public void testBrokerList() {
+        assertEquals(0, BrokerList.EMPTY.size());
+        assertEquals(-1, BrokerList.EMPTY.next(1));
+        BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+        assertEquals(4, brokers.size());
+        assertEquals(0, brokers.next(0));
+        assertEquals(1, brokers.next(0));
+        assertEquals(2, brokers.next(0));
+        assertEquals(3, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(1, brokers.next(1));
+        assertEquals(2, brokers.next(1));
+        assertEquals(3, brokers.next(1));
+        assertEquals(0, brokers.next(1));
+        assertEquals(-1, brokers.next(1));
+    }
+
+    /**
+     * Test that we perform striped replica placement as expected, and don't use the
+     * fenced replica if we don't have to.
+     */
+    @Test
+    public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(5, rackList.numTotalBrokers());
+        assertEquals(4, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0));
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1));
+        assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+        assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+        assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+        assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+        assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+    }
+
+    /**
+     * Test that we will place on the fenced replica if we need to.
+     */
+    @Test
+    public void testPlacementOnFencedReplicaOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(2, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+    }
+
+    @Test
+    public void testRackListWithMultipleRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), false),
+            new UsableBroker(31, Optional.of("3"), false),
+            new UsableBroker(21, Optional.of("2"), false),
+            new UsableBroker(20, Optional.of("2"), true)).iterator());
+        assertEquals(6, rackList.numTotalBrokers());
+        assertEquals(5, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"), Optional.of("2"), Optional.of("3")), rackList.rackNames());
+        assertEquals(Arrays.asList(11, 21, 31, 10), rackList.place(4));
+        assertEquals(Arrays.asList(21, 30, 10, 20), rackList.place(4));
+        assertEquals(Arrays.asList(31, 11, 21, 30), rackList.place(4));
+    }
+
+    @Test
+    public void testRackListWithInvalidRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), true),
+            new UsableBroker(31, Optional.of("3"), true),
+            new UsableBroker(20, Optional.of("2"), true),
+            new UsableBroker(21, Optional.of("2"), true),
+            new UsableBroker(41, Optional.of("4"), false),
+            new UsableBroker(40, Optional.of("4"), true)).iterator());
+        assertEquals(8, rackList.numTotalBrokers());
+        assertEquals(3, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"),
+            Optional.of("2"),
+            Optional.of("3"),
+            Optional.of("4")), rackList.rackNames());
+        assertEquals(Arrays.asList(41, 11, 21, 30), rackList.place(4));
+        assertEquals(Arrays.asList(10, 20, 31, 41), rackList.place(4));
+        assertEquals(Arrays.asList(41, 21, 30, 11), rackList.place(4));
+    }
+
+    @Test
+    public void testAllBrokersFenced() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("All brokers are currently fenced.",
+            assertThrows(InvalidReplicationFactorException.class,
+                () -> placer.place(0, 1, (short) 1, Arrays.asList(
+                    new UsableBroker(11, Optional.of("1"), true),
+                    new UsableBroker(10, Optional.of("1"), true)).iterator())).getMessage());
+    }
+
+    @Test
+    public void testNotEnoughBrokers() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("The target replication factor of 3 cannot be reached because only " +
+            "2 broker(s) are registered.",
+            assertThrows(InvalidReplicationFactorException.class,
+                () -> placer.place(0, 1, (short) 3, Arrays.asList(
+                    new UsableBroker(11, Optional.of("1"), false),
+                    new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage());
+    }
+
+    @Test
+    public void testSuccessfulPlacement() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals(Arrays.asList(Arrays.asList(2, 3, 0),
+                Arrays.asList(3, 0, 1),
+                Arrays.asList(0, 1, 2),
+                Arrays.asList(1, 2, 3),
+                Arrays.asList(1, 0, 2)),
+            placer.place(0, 5, (short) 3, Arrays.asList(
+                new UsableBroker(0, Optional.empty(), false),
+                new UsableBroker(3, Optional.empty(), false),
+                new UsableBroker(2, Optional.empty(), false),
+                new UsableBroker(1, Optional.empty(), false)).iterator()));
+    }
+
+    @Test
+    public void testEvenDistribution() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        List<List<Integer>> replicas = placer.place(0, 200, (short) 2, Arrays.asList(
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false),
+            new UsableBroker(3, Optional.empty(), false)).iterator());
+        Map<List<Integer>, Integer> counts = new HashMap<>();
+        for (List<Integer> partitionReplicas : replicas) {
+            counts.put(partitionReplicas, counts.getOrDefault(partitionReplicas, 0) + 1);
+        }
+        assertEquals(14, counts.get(Arrays.asList(0, 1)));
+        assertEquals(22, counts.get(Arrays.asList(0, 2)));
+        assertEquals(14, counts.get(Arrays.asList(0, 3)));
+        assertEquals(17, counts.get(Arrays.asList(1, 0)));
+        assertEquals(17, counts.get(Arrays.asList(1, 2)));
+        assertEquals(16, counts.get(Arrays.asList(1, 3)));
+        assertEquals(13, counts.get(Arrays.asList(2, 0)));
+        assertEquals(17, counts.get(Arrays.asList(2, 1)));
+        assertEquals(20, counts.get(Arrays.asList(2, 3)));
+        assertEquals(20, counts.get(Arrays.asList(3, 0)));
+        assertEquals(19, counts.get(Arrays.asList(3, 1)));
+        assertEquals(11, counts.get(Arrays.asList(3, 2)));
+    }
+
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java b/metadata/src/test/java/org/apache/kafka/metadata/OptionalStringComparatorTest.java
similarity index 51%
copy from metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
copy to metadata/src/test/java/org/apache/kafka/metadata/OptionalStringComparatorTest.java
index 9a2db10..68f2544 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/OptionalStringComparatorTest.java
@@ -17,45 +17,24 @@
 
 package org.apache.kafka.metadata;
 
-import java.util.Objects;
-import java.util.Optional;
-
-
-/**
- * A broker where a replica can be placed.
- */
-public class UsableBroker {
-    private final int id;
-
-    private final Optional<String> rack;
-
-    public UsableBroker(int id, Optional<String> rack) {
-        this.id = id;
-        this.rack = rack;
-    }
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
-    public int id() {
-        return id;
-    }
-
-    public Optional<String> rack() {
-        return rack;
-    }
+import java.util.Optional;
 
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof UsableBroker)) return false;
-        UsableBroker other = (UsableBroker) o;
-        return other.id == id && other.rack.equals(rack);
-    }
+import static org.apache.kafka.metadata.OptionalStringComparator.INSTANCE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, rack);
-    }
 
-    @Override
-    public String toString() {
-        return "UsableBroker(id=" + id + ", rack=" + rack + ")";
+@Timeout(value = 40)
+public class OptionalStringComparatorTest {
+    @Test
+    public void testComparisons() {
+        assertEquals(0, INSTANCE.compare(Optional.of("foo"), Optional.of("foo")));
+        assertEquals(-1, INSTANCE.compare(Optional.of("a"), Optional.of("b")));
+        assertEquals(1, INSTANCE.compare(Optional.of("b"), Optional.of("a")));
+        assertEquals(-1, INSTANCE.compare(Optional.empty(), Optional.of("a")));
+        assertEquals(1, INSTANCE.compare(Optional.of("a"), Optional.empty()));
+        assertEquals(0, INSTANCE.compare(Optional.empty(), Optional.empty()));
     }
 }
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py
index d8873b2..b9085cb 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -75,11 +75,13 @@ class RoundTripFaultTest(Test):
             return []
 
     @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
     def test_round_trip_workload(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         workload1.wait_for_done(timeout_sec=600)
 
     @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
     def test_round_trip_workload_with_broker_partition(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
@@ -93,6 +95,7 @@ class RoundTripFaultTest(Test):
         partition1.wait_for_done()
 
     @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
     def test_produce_consume_with_broker_pause(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
@@ -105,6 +108,7 @@ class RoundTripFaultTest(Test):
         self.kafka.stop_node(self.kafka.nodes[0], False)
 
     @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
     def test_produce_consume_with_client_partition(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
@@ -117,6 +121,7 @@ class RoundTripFaultTest(Test):
         stop1.wait_for_done()
 
     @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
     def test_produce_consume_with_latency(self, metadata_quorum=quorum.zk):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)