You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/13 17:54:07 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10494: MINOR: improve KRaft replica placement

junrao commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r631962390



##########
File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple case where
+ * broker racks have not been configured, this goal is a no-op, of course.  But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we are placing
+ * multiple partitions, we try to avoid putting each partition on the same set of
+ * replicas, even if it does satisfy the rack placement goal.  However, we treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one
+ * broker in rack C.  If you were to place a lot of partitions with replication factor 3,
+ * each partition would try to get a replica there.  In general racks are supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot be
+ * satisfied.  The first constraint is that we can't place more than one replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to create
+ * new topics even if every broker were fenced.  However, this would be confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with replication
+ * factor 3, our first replica might come from A, our second from B, and our third from C.
+ * Of course our placement would not be very fair if we always started with rack A.
+ * Therefore, we generate a random starting offset when the RackList is created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on.
+ * Just like with the racks, we add a random starting offset to mix things up a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes available,
+ * we'd prefer not to place a new partition on a node that is fenced.  Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+        private int epoch = 0;
+        private int index = 0;
+        private int offset = 0;

Review comment:
       Could we add a bit comment explaining epoch, index and offset?

##########
File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+    /**
+     * Test that the BrokerList class works as expected.
+     */
+    @Test
+    public void testBrokerList() {
+        assertEquals(0, BrokerList.EMPTY.size());
+        assertEquals(-1, BrokerList.EMPTY.next(1));
+        BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+        assertEquals(4, brokers.size());
+        assertEquals(0, brokers.next(0));
+        assertEquals(1, brokers.next(0));
+        assertEquals(2, brokers.next(0));
+        assertEquals(3, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(1, brokers.next(1));
+        assertEquals(2, brokers.next(1));
+        assertEquals(3, brokers.next(1));
+        assertEquals(0, brokers.next(1));
+        assertEquals(-1, brokers.next(1));
+    }
+
+    /**
+     * Test that we perform striped replica placement as expected, and don't use the
+     * fenced replica if we don't have to.
+     */
+    @Test
+    public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(5, rackList.numTotalBrokers());
+        assertEquals(4, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0));
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1));
+        assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+        assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+        assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+        assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+        assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+    }
+
+    /**
+     * Test that we will place on the fenced replica if we need to.
+     */
+    @Test
+    public void testPlacementOnFencedReplicaOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(2, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));

Review comment:
       Hmm, we should have shuffled the broker list here. Why is the assignment pattern repeating here?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple case where
+ * broker racks have not been configured, this goal is a no-op, of course.  But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we are placing
+ * multiple partitions, we try to avoid putting each partition on the same set of
+ * replicas, even if it does satisfy the rack placement goal.  However, we treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one
+ * broker in rack C.  If you were to place a lot of partitions with replication factor 3,
+ * each partition would try to get a replica there.  In general racks are supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot be
+ * satisfied.  The first constraint is that we can't place more than one replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to create
+ * new topics even if every broker were fenced.  However, this would be confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with replication
+ * factor 3, our first replica might come from A, our second from B, and our third from C.
+ * Of course our placement would not be very fair if we always started with rack A.
+ * Therefore, we generate a random starting offset when the RackList is created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on.
+ * Just like with the racks, we add a random starting offset to mix things up a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes available,
+ * we'd prefer not to place a new partition on a node that is fenced.  Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+        private int epoch = 0;
+        private int index = 0;
+        private int offset = 0;
+
+        BrokerList add(int broker) {
+            this.brokers.add(broker);
+            return this;
+        }
+
+        /**
+         * Initialize this broker list by sorting it and randomizing the start offset.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            if (!brokers.isEmpty()) {
+                brokers.sort(Integer::compareTo);
+                this.offset = random.nextInt(brokers.size());
+            }
+        }
+
+        /**
+         * Randomly shuffle the brokers in this list.
+         */
+        void shuffle(Random random) {
+            Collections.shuffle(brokers, random);
+        }
+
+        /**
+         * @return          The number of brokers in this list.
+         */
+        int size() {
+            return brokers.size();
+        }
+
+        /**
+         * Get the next broker in this list, or -1 if there are no more elements to be
+         * returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers to be
+         *                  returned in this epoch.
+         */
+        int next(int epoch) {
+            if (brokers.size() == 0) return -1;
+            if (this.epoch != epoch) {
+                this.epoch = epoch;
+                this.index = 0;
+                this.offset = (offset + 1) % brokers.size();
+            }
+            if (index >= brokers.size()) return -1;
+            int broker = brokers.get((index + offset) % brokers.size());
+            index++;
+            return broker;
+        }
+    }
+
+    /**
+     * A rack in the cluster, which contains brokers.
+     */
+    static class Rack {
+        private final BrokerList fenced = new BrokerList();
+        private final BrokerList unfenced = new BrokerList();
+
+        /**
+         * Initialize this rack.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            fenced.initialize(random);
+            unfenced.initialize(random);
+        }
+
+        void shuffle(Random random) {
+            fenced.shuffle(random);
+            unfenced.shuffle(random);
+        }
+
+        BrokerList fenced() {
+            return fenced;
+        }
+
+        BrokerList unfenced() {
+            return unfenced;
+        }
+
+        /**
+         * Get the next unfenced broker in this rack, or -1 if there are no more brokers
+         * to be returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers to be
+         *                  returned in this epoch.
+         */
+        int nextUnfenced(int epoch) {
+            return unfenced.next(epoch);
+        }
+
+        /**
+         * Get the next broker in this rack, or -1 if there are no more brokers to be
+         * returned.
+         *
+         * @param epoch     The current iteration epoch.
+         *
+         * @return          The broker ID, or -1 if there are no more brokers to be
+         *                  returned in this epoch.
+         */
+        int next(int epoch) {
+            int result = unfenced.next(epoch);
+            if (result >= 0) return result;
+            return fenced.next(epoch);
+        }
+    }
+
+    /**
+     * A list of racks that we can iterate through.
+     */
+    static class RackList {
+        /**
+         * The random number generator.
+         */
+        private final Random random;
+
+        /**
+         * A map from rack names to the brokers contained within them.
+         */
+        private final Map<Optional<String>, Rack> racks = new HashMap<>();
+
+        /**
+         * The names of all the racks in the cluster.
+         *
+         * Racks which have at least one unfenced broker come first (in sorted order),
+         * followed by racks which have only fenced brokers (also in sorted order).
+         */
+        private final List<Optional<String>> rackNames = new ArrayList<>();
+
+        /**
+         * The total number of brokers in the cluster, both fenced and unfenced.
+         */
+        private final int numTotalBrokers;
+
+        /**
+         * The total number of unfenced brokers in the cluster.
+         */
+        private final int numUnfencedBrokers;
+
+        /**
+         * The iteration epoch.
+         */
+        private int epoch = 0;
+
+        /**
+         * The offset we use to determine which rack is returned first.
+         */
+        private int offset;
+
+        RackList(Random random, Iterator<UsableBroker> iterator) {
+            this.random = random;
+            int numTotalBrokersCount = 0, numUnfencedBrokersCount = 0;
+            while (iterator.hasNext()) {
+                UsableBroker broker = iterator.next();
+                Rack rack = racks.get(broker.rack());
+                if (rack == null) {
+                    rackNames.add(broker.rack());
+                    rack = new Rack();
+                    racks.put(broker.rack(), rack);
+                }
+                if (broker.fenced()) {
+                    rack.fenced().add(broker.id());
+                } else {
+                    numUnfencedBrokersCount++;
+                    rack.unfenced().add(broker.id());
+                }
+                numTotalBrokersCount++;
+            }
+            for (Rack rack : racks.values()) {
+                rack.initialize(random);
+            }
+            this.rackNames.sort(OptionalStringComparator.INSTANCE);
+            this.numTotalBrokers = numTotalBrokersCount;
+            this.numUnfencedBrokers = numUnfencedBrokersCount;
+            this.offset = rackNames.isEmpty() ? 0 : random.nextInt(rackNames.size());
+        }
+
+        int numTotalBrokers() {
+            return numTotalBrokers;
+        }
+
+        int numUnfencedBrokers() {
+            return numUnfencedBrokers;
+        }
+
+        // VisibleForTesting
+        List<Optional<String>> rackNames() {
+            return rackNames;
+        }
+
+        List<Integer> place(int replicationFactor) {
+            if (replicationFactor <= 0) {
+                throw new InvalidReplicationFactorException("Invalid replication factor " +
+                        replicationFactor + ": the replication factor must be positive.");
+            }
+            // If we have returned as many assignments as there are unfenced brokers in
+            // the cluster, shuffle the rack list and broker lists to try to avoid
+            // repeating the same assignments again.
+            if (epoch == numUnfencedBrokers) {

Review comment:
       Should this be `(epoch % numUnfencedBrokers) == 0`?

##########
File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+    /**
+     * Test that the BrokerList class works as expected.
+     */
+    @Test
+    public void testBrokerList() {
+        assertEquals(0, BrokerList.EMPTY.size());
+        assertEquals(-1, BrokerList.EMPTY.next(1));
+        BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+        assertEquals(4, brokers.size());
+        assertEquals(0, brokers.next(0));
+        assertEquals(1, brokers.next(0));
+        assertEquals(2, brokers.next(0));
+        assertEquals(3, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(1, brokers.next(1));
+        assertEquals(2, brokers.next(1));
+        assertEquals(3, brokers.next(1));
+        assertEquals(0, brokers.next(1));
+        assertEquals(-1, brokers.next(1));
+    }
+
+    /**
+     * Test that we perform striped replica placement as expected, and don't use the
+     * fenced replica if we don't have to.
+     */
+    @Test
+    public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(5, rackList.numTotalBrokers());
+        assertEquals(4, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0));
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1));
+        assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+        assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+        assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+        assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+        assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+    }
+
+    /**
+     * Test that we will place on the fenced replica if we need to.
+     */
+    @Test
+    public void testPlacementOnFencedReplicaOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(2, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+    }
+
+    @Test
+    public void testRackListWithMultipleRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), false),
+            new UsableBroker(31, Optional.of("3"), false),
+            new UsableBroker(21, Optional.of("2"), false),
+            new UsableBroker(20, Optional.of("2"), true)).iterator());
+        assertEquals(6, rackList.numTotalBrokers());
+        assertEquals(5, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"), Optional.of("2"), Optional.of("3")), rackList.rackNames());
+        assertEquals(Arrays.asList(11, 21, 31, 10), rackList.place(4));
+        assertEquals(Arrays.asList(21, 30, 10, 20), rackList.place(4));
+        assertEquals(Arrays.asList(31, 11, 21, 30), rackList.place(4));
+    }
+
+    @Test
+    public void testRackListWithInvalidRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), true),
+            new UsableBroker(31, Optional.of("3"), true),
+            new UsableBroker(20, Optional.of("2"), true),
+            new UsableBroker(21, Optional.of("2"), true),
+            new UsableBroker(41, Optional.of("4"), false),
+            new UsableBroker(40, Optional.of("4"), true)).iterator());
+        assertEquals(8, rackList.numTotalBrokers());
+        assertEquals(3, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"),
+            Optional.of("2"),
+            Optional.of("3"),
+            Optional.of("4")), rackList.rackNames());
+        assertEquals(Arrays.asList(41, 11, 21, 30), rackList.place(4));

Review comment:
       Why didn't the leader start from rack 1, which is the fist in the rack list? Also, why didn't rack 2 start with 20, which sorts first during initialization?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple case where
+ * broker racks have not been configured, this goal is a no-op, of course.  But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we are placing
+ * multiple partitions, we try to avoid putting each partition on the same set of
+ * replicas, even if it does satisfy the rack placement goal.  However, we treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one
+ * broker in rack C.  If you were to place a lot of partitions with replication factor 3,
+ * each partition would try to get a replica there.  In general racks are supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot be
+ * satisfied.  The first constraint is that we can't place more than one replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to create
+ * new topics even if every broker were fenced.  However, this would be confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with replication
+ * factor 3, our first replica might come from A, our second from B, and our third from C.
+ * Of course our placement would not be very fair if we always started with rack A.
+ * Therefore, we generate a random starting offset when the RackList is created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on.
+ * Just like with the racks, we add a random starting offset to mix things up a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes available,
+ * we'd prefer not to place a new partition on a node that is fenced.  Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+        private int epoch = 0;
+        private int index = 0;
+        private int offset = 0;
+
+        BrokerList add(int broker) {
+            this.brokers.add(broker);
+            return this;
+        }
+
+        /**
+         * Initialize this broker list by sorting it and randomizing the start offset.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            if (!brokers.isEmpty()) {
+                brokers.sort(Integer::compareTo);
+                this.offset = random.nextInt(brokers.size());
+            }
+        }
+
+        /**
+         * Randomly shuffle the brokers in this list.
+         */
+        void shuffle(Random random) {
+            Collections.shuffle(brokers, random);

Review comment:
       Should we reset the offset here too?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple case where
+ * broker racks have not been configured, this goal is a no-op, of course.  But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we are placing
+ * multiple partitions, we try to avoid putting each partition on the same set of
+ * replicas, even if it does satisfy the rack placement goal.  However, we treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one
+ * broker in rack C.  If you were to place a lot of partitions with replication factor 3,
+ * each partition would try to get a replica there.  In general racks are supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced

Review comment:
       It would be useful to document another goal: for partitions with the same 1st replica, we want to distribute the second replica for those partitions evenly. This way, if the first broker fails, the new leaders will be evenly distributed among the surviving brokers. The algorithm achieves that by forcing a shuffle when the partition index is a multiple of the number of brokers.

##########
File path: metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
+import org.apache.kafka.metadata.UsableBroker;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class StripedReplicaPlacerTest {
+    /**
+     * Test that the BrokerList class works as expected.
+     */
+    @Test
+    public void testBrokerList() {
+        assertEquals(0, BrokerList.EMPTY.size());
+        assertEquals(-1, BrokerList.EMPTY.next(1));
+        BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3);
+        assertEquals(4, brokers.size());
+        assertEquals(0, brokers.next(0));
+        assertEquals(1, brokers.next(0));
+        assertEquals(2, brokers.next(0));
+        assertEquals(3, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(-1, brokers.next(0));
+        assertEquals(1, brokers.next(1));
+        assertEquals(2, brokers.next(1));
+        assertEquals(3, brokers.next(1));
+        assertEquals(0, brokers.next(1));
+        assertEquals(-1, brokers.next(1));
+    }
+
+    /**
+     * Test that we perform striped replica placement as expected, and don't use the
+     * fenced replica if we don't have to.
+     */
+    @Test
+    public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(0, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(5, rackList.numTotalBrokers());
+        assertEquals(4, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0));
+        assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1));
+        assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4));
+        assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4));
+        assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4));
+        assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4));
+        assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
+    }
+
+    /**
+     * Test that we will place on the fenced replica if we need to.
+     */
+    @Test
+    public void testPlacementOnFencedReplicaOnSingleRack() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(2, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+        assertEquals(Arrays.asList(3, 2, 1), rackList.place(3));
+        assertEquals(Arrays.asList(2, 3, 1), rackList.place(3));
+    }
+
+    @Test
+    public void testRackListWithMultipleRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), false),
+            new UsableBroker(31, Optional.of("3"), false),
+            new UsableBroker(21, Optional.of("2"), false),
+            new UsableBroker(20, Optional.of("2"), true)).iterator());
+        assertEquals(6, rackList.numTotalBrokers());
+        assertEquals(5, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"), Optional.of("2"), Optional.of("3")), rackList.rackNames());
+        assertEquals(Arrays.asList(11, 21, 31, 10), rackList.place(4));
+        assertEquals(Arrays.asList(21, 30, 10, 20), rackList.place(4));
+        assertEquals(Arrays.asList(31, 11, 21, 30), rackList.place(4));
+    }
+
+    @Test
+    public void testRackListWithInvalidRacks() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+            new UsableBroker(11, Optional.of("1"), false),
+            new UsableBroker(10, Optional.of("1"), false),
+            new UsableBroker(30, Optional.of("3"), true),
+            new UsableBroker(31, Optional.of("3"), true),
+            new UsableBroker(20, Optional.of("2"), true),
+            new UsableBroker(21, Optional.of("2"), true),
+            new UsableBroker(41, Optional.of("4"), false),
+            new UsableBroker(40, Optional.of("4"), true)).iterator());
+        assertEquals(8, rackList.numTotalBrokers());
+        assertEquals(3, rackList.numUnfencedBrokers());
+        assertEquals(Arrays.asList(Optional.of("1"),
+            Optional.of("2"),
+            Optional.of("3"),
+            Optional.of("4")), rackList.rackNames());
+        assertEquals(Arrays.asList(41, 11, 21, 30), rackList.place(4));
+        assertEquals(Arrays.asList(10, 20, 31, 41), rackList.place(4));
+        assertEquals(Arrays.asList(41, 21, 30, 11), rackList.place(4));
+    }
+
+    @Test
+    public void testAllBrokersFenced() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("All brokers are currently fenced.",
+            assertThrows(InvalidReplicationFactorException.class,
+                () -> placer.place(0, 1, (short) 1, Arrays.asList(
+                    new UsableBroker(11, Optional.of("1"), true),
+                    new UsableBroker(10, Optional.of("1"), true)).iterator())).getMessage());
+    }
+
+    @Test
+    public void testNotEnoughBrokers() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("The target replication factor of 3 cannot be reached because only " +
+            "2 broker(s) are registered.",
+            assertThrows(InvalidReplicationFactorException.class,
+                () -> placer.place(0, 1, (short) 3, Arrays.asList(
+                    new UsableBroker(11, Optional.of("1"), false),
+                    new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage());
+    }
+
+    @Test
+    public void testSuccessfulPlacement() {

Review comment:
       Could we add a test that verifies that not only the first replica is distributed evenly, but for partitions with the same first replica, their second replicas are also distributed evenly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org