You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/11/30 22:11:04 UTC

(kafka) branch trunk updated: KAFKA-15886: Always specify directories for new partition registrations

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b87c852911 KAFKA-15886: Always specify directories for new partition registrations
6b87c852911 is described below

commit 6b87c8529113939f3a3709603b6f37d733daf49a
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Wed Nov 22 16:44:35 2023 +0000

    KAFKA-15886: Always specify directories for new partition registrations
    
    When creating partition registrations directories must always be defined.
    
    If creating a partition from a PartitionRecord or PartitionChangeRecord from an older version that
    does not support directory assignments, then DirectoryId.MIGRATING is assumed.
    
    If creating a new partition, or triggering a change in assignment, DirectoryId.UNASSIGNED should be
    specified, unless the target broker has a single online directory registered, in which case the
    replica should be assigned directly to that single directory.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../kafka/server/metadata/KRaftMetadataCache.scala |  17 +--
 .../server/ReplicaManagerConcurrencyTest.scala     |   3 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |   3 +-
 .../kafka/zk/migration/ZkMigrationClientTest.scala | 112 +++++++++++++++--
 .../kafka/controller/PartitionChangeBuilder.java   |  33 ++++-
 .../controller/ReplicationControlManager.java      | 134 +++++++++++++--------
 .../apache/kafka/metadata/BrokerRegistration.java  |   3 +-
 .../kafka/metadata/PartitionRegistration.java      |  19 ++-
 .../kafka/metadata/placement/ClusterDescriber.java |   8 +-
 ...usterDescriber.java => DefaultDirProvider.java} |  20 ++-
 .../metadata/placement/PartitionAssignment.java    |  40 ++++--
 .../metadata/placement/StripedReplicaPlacer.java   |   2 +-
 .../controller/ClusterControlManagerTest.java      |  22 +++-
 .../controller/PartitionChangeBuilderTest.java     | 128 +++++++++++++++++---
 .../PartitionReassignmentReplicasTest.java         |  23 ++++
 .../PartitionReassignmentRevertTest.java           |  32 +++++
 .../controller/ReplicationControlManagerTest.java  |  68 +++++++++--
 .../ControllerMetadataMetricsPublisherTest.java    |   3 +-
 .../metrics/ControllerMetricsChangesTest.java      |   3 +-
 .../metrics/ControllerMetricsTestUtils.java        |   2 +
 .../org/apache/kafka/image/ImageDowngradeTest.java |   6 +-
 .../org/apache/kafka/image/MetadataImageTest.java  |   3 +-
 .../org/apache/kafka/image/TopicsImageTest.java    |  18 ++-
 .../kafka/metadata/BrokerRegistrationTest.java     |   7 +-
 .../kafka/metadata/PartitionRegistrationTest.java  |  81 +++++++++++--
 .../placement/PartitionAssignmentTest.java         |  11 +-
 .../placement/StripedReplicaPlacerTest.java        |  15 ++-
 .../metadata/placement/TopicAssignmentTest.java    |  11 +-
 .../kafka/metadata/util/MetadataFeatureUtil.java}  |  24 ++--
 .../java/org/apache/kafka/common/DirectoryId.java  |  33 ++++-
 .../org/apache/kafka/common/DirectoryIdTest.java   |  63 ++++------
 31 files changed, 728 insertions(+), 219 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 9a29fb77889..485bba8812c 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -23,7 +23,7 @@ import kafka.utils.Logging
 import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
-import org.apache.kafka.common.{Cluster, DirectoryId, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
@@ -155,18 +155,11 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
     offlineReplicas
   }
 
-  private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) = {
-    broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isInOfflineDir(broker, partition)
-  }
+  private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
+    broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)
 
-  private def isInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean = {
-    partition.directory(broker.id()) match {
-      case DirectoryId.LOST => true
-      case DirectoryId.MIGRATING => false
-      case DirectoryId.UNASSIGNED => false
-      case dir => !broker.hasOnlineDir(dir)
-    }
-  }
+  private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
+    !broker.hasOnlineDir(partition.directory(broker.id()))
 
   /**
    * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 9e7f1022e50..9813c0b73d9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
@@ -474,6 +474,7 @@ class ReplicaManagerConcurrencyTest {
   ): PartitionRegistration = {
     new PartitionRegistration.Builder().
       setReplicas(replicaIds.toArray).
+      setDirectories(DirectoryId.unassignedArray(replicaIds.size)).
       setIsr(isr.toArray).
       setLeader(leader).
       setLeaderRecoveryState(leaderRecoveryState).
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 136fb87e894..f80e2601979 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTop
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
 import org.apache.kafka.common.utils.Exit
-import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, TopicImage, TopicsImage}
 import org.apache.kafka.image.loader.LogDeltaManifest
@@ -162,6 +162,7 @@ class BrokerMetadataPublisherTest {
     val partitionRegistrations = partitions.map { case (partitionId, replicas) =>
       Int.box(partitionId) -> new PartitionRegistration.Builder().
         setReplicas(replicas.toArray).
+        setDirectories(DirectoryId.unassignedArray(replicas.size)).
         setIsr(replicas.toArray).
         setLeader(replicas.head).
         setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index 773c42a66e4..93b29c701c4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -23,7 +23,7 @@ import kafka.server.{ConfigType, KafkaConfig}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, PartitionRecord, ProducerIdsRecord, TopicRecord}
-import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
 import org.apache.kafka.metadata.migration.{KRaftMigrationZkWriter, ZkMigrationLeadershipState}
 import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
@@ -79,8 +79,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
     assertEquals(0, migrationState.migrationZkVersion())
 
     val partitions = Map(
-      0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(1, 2)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(6).setPartitionEpoch(-1).build(),
-      1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(3)).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(7).setPartitionEpoch(-1).build()
+      0 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(0, 1, 2))
+        .setDirectories(DirectoryId.migratingArray(3))
+        .setIsr(Array(1, 2))
+        .setLeader(1)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(6)
+        .setPartitionEpoch(-1)
+        .build(),
+      1 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(1, 2, 3))
+        .setDirectories(DirectoryId.migratingArray(3))
+        .setIsr(Array(3))
+        .setLeader(3)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(7)
+        .setPartitionEpoch(-1)
+        .build()
     ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
     migrationState = migrationClient.topicClient().updateTopicPartitions(Map("test" -> partitions).asJava, migrationState)
     assertEquals(1, migrationState.migrationZkVersion())
@@ -106,8 +122,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
     assertEquals(0, migrationState.migrationZkVersion())
 
     val partitions = Map(
-      0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
-      1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+      0 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(0, 1, 2))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(0, 1, 2))
+        .setLeader(0)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build(),
+      1 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(1, 2, 3))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(1, 2, 3))
+        .setLeader(1)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build()
     ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
     migrationState = migrationClient.topicClient().createTopic("test", Uuid.randomUuid(), partitions, migrationState)
     assertEquals(1, migrationState.migrationZkVersion())
@@ -129,8 +161,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
     assertEquals(0, migrationState.migrationZkVersion())
 
     val partitions = Map(
-      0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
-      1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+      0 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(0, 1, 2))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(0, 1, 2))
+        .setLeader(0)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build(),
+      1 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(1, 2, 3))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(1, 2, 3))
+        .setLeader(1)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build()
     ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
     val topicId = Uuid.randomUuid()
     migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
@@ -375,8 +423,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
 
     val topicId = Uuid.randomUuid()
     val partitions = Map(
-      0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
-      1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+      0 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(0, 1, 2))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(0, 1, 2))
+        .setLeader(0)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build(),
+      1 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(1, 2, 3))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(1, 2, 3))
+        .setLeader(1)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build()
     ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
     migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
     assertEquals(1, migrationState.migrationZkVersion())
@@ -384,8 +448,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
     // Change assignment in partitions and update the topic assignment. See the change is
     // reflected.
     val changedPartitions = Map(
-      0 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
-      1 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+      0 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(1, 2, 3))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(1, 2, 3))
+        .setLeader(0)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build(),
+      1 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(0, 1, 2))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(0, 1, 2))
+        .setLeader(1)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build()
     ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
     migrationState = migrationClient.topicClient().updateTopic("test", topicId, changedPartitions, migrationState)
     assertEquals(2, migrationState.migrationZkVersion())
@@ -406,7 +486,15 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
 
     // Add a new Partition.
     val newPartition = Map(
-      2 -> new PartitionRegistration.Builder().setReplicas(Array(2, 3, 4)).setIsr(Array(2, 3, 4)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+      2 -> new PartitionRegistration.Builder()
+        .setReplicas(Array(2, 3, 4))
+        .setDirectories(DirectoryId.unassignedArray(3))
+        .setIsr(Array(2, 3, 4))
+        .setLeader(1)
+        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(-1)
+        .build()
     ).map { case (k, v) => int2Integer(k) -> v }.asJava
     migrationState = migrationClient.topicClient().createTopicPartitions(Map("test" -> newPartition).asJava, migrationState)
     assertEquals(3, migrationState.migrationZkVersion())
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index a72528e7e1b..0ccc09bcdfb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.controller;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.IntPredicate;
@@ -33,6 +35,7 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.metadata.placement.DefaultDirProvider;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.slf4j.Logger;
@@ -94,6 +97,8 @@ public class PartitionChangeBuilder {
     private boolean zkMigrationEnabled;
     private boolean eligibleLeaderReplicasEnabled;
     private int minISR;
+    private Map<Integer, Uuid> targetDirectories;
+    private DefaultDirProvider defaultDirProvider;
 
     // Whether allow electing last known leader in a Balanced recovery. Note, the last known leader will be stored in the
     // lastKnownElr field if enabled.
@@ -123,6 +128,10 @@ public class PartitionChangeBuilder {
         this.targetElr = Replicas.toList(partition.elr);
         this.targetLastKnownElr = Replicas.toList(partition.lastKnownElr);
         this.targetLeaderRecoveryState = partition.leaderRecoveryState;
+        this.targetDirectories = DirectoryId.createAssignmentMap(partition.replicas, partition.directories);
+        this.defaultDirProvider = uuid -> {
+            throw new IllegalStateException("DefaultDirProvider is not set");
+        };
     }
 
     public PartitionChangeBuilder setTargetIsr(List<Integer> targetIsr) {
@@ -184,6 +193,16 @@ public class PartitionChangeBuilder {
         return this;
     }
 
+    public PartitionChangeBuilder setDirectory(int brokerId, Uuid dir) {
+        this.targetDirectories.put(brokerId, dir);
+        return this;
+    }
+
+    public PartitionChangeBuilder setDefaultDirProvider(DefaultDirProvider defaultDirProvider) {
+        this.defaultDirProvider = defaultDirProvider;
+        return this;
+    }
+
     // VisibleForTesting
     static class ElectionResult {
         final int node;
@@ -446,11 +465,19 @@ public class PartitionChangeBuilder {
     }
 
     private void setAssignmentChanges(PartitionChangeRecord record) {
-        if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
+        if (!targetReplicas.isEmpty()) {
             if (metadataVersion.isDirectoryAssignmentSupported()) {
-                record.setDirectories(DirectoryId.createDirectoriesFrom(partition.replicas, partition.directories, targetReplicas));
+                List<Uuid> directories = new ArrayList<>(targetReplicas.size());
+                for (int replica : targetReplicas) {
+                    directories.add(this.targetDirectories.getOrDefault(replica, defaultDirProvider.defaultDir(replica)));
+                }
+                if (!directories.equals(Arrays.asList(partition.directories))) {
+                    record.setDirectories(directories);
+                }
+            }
+            if (!targetReplicas.equals(Replicas.toList(partition.replicas))) {
+                record.setReplicas(targetReplicas);
             }
-            record.setReplicas(targetReplicas);
         }
         if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {
             record.setRemovingReplicas(targetRemoving);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index da048b210f4..f087027a749 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
@@ -46,7 +47,6 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
 import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
-import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
@@ -246,6 +246,12 @@ public class ReplicationControlManager {
         public Iterator<UsableBroker> usableBrokers() {
             return clusterControl.usableBrokers();
         }
+
+        @Override
+        public Uuid defaultDir(int brokerId) {
+            // TODO KAFKA-15361 & KAFKA-15364 determine default dir based on broker registration and heartbeat
+            return DirectoryId.MIGRATING;
+        }
     }
 
     static class TopicControlInfo {
@@ -675,9 +681,8 @@ public class ReplicationControlManager {
                         "Found multiple manual partition assignments for partition " +
                             assignment.partitionIndex());
                 }
-                validateManualPartitionAssignment(
-                    new PartitionAssignment(assignment.brokerIds()),
-                    replicationFactor);
+                PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber);
+                validateManualPartitionAssignment(partitionAssignment, replicationFactor);
                 replicationFactor = OptionalInt.of(assignment.brokerIds().size());
                 List<Integer> isr = assignment.brokerIds().stream().
                     filter(clusterControl::isActive).collect(Collectors.toList());
@@ -688,7 +693,7 @@ public class ReplicationControlManager {
                 }
                 newParts.put(
                     assignment.partitionIndex(),
-                    buildPartitionRegistration(assignment.brokerIds(), isr)
+                    buildPartitionRegistration(partitionAssignment, isr)
                 );
             }
             for (int i = 0; i < newParts.size(); i++) {
@@ -724,8 +729,7 @@ public class ReplicationControlManager {
                 ), clusterDescriber);
                 for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) {
                     PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
-                    List<Integer> replicas = partitionAssignment.replicas();
-                    List<Integer> isr = replicas.stream().
+                    List<Integer> isr = partitionAssignment.replicas().stream().
                         filter(clusterControl::isActive).collect(Collectors.toList());
                     // If the ISR is empty, it means that all brokers are fenced or
                     // in controlled shutdown. To be consistent with the replica placer,
@@ -737,7 +741,7 @@ public class ReplicationControlManager {
                     }
                     newParts.put(
                         partitionId,
-                        buildPartitionRegistration(replicas, isr)
+                        buildPartitionRegistration(partitionAssignment, isr)
                     );
                 }
             } catch (InvalidReplicationFactorException e) {
@@ -800,11 +804,12 @@ public class ReplicationControlManager {
     }
 
     private static PartitionRegistration buildPartitionRegistration(
-        List<Integer> replicas,
+        PartitionAssignment partitionAssignment,
         List<Integer> isr
     ) {
         return new PartitionRegistration.Builder().
-            setReplicas(Replicas.toArray(replicas)).
+            setReplicas(Replicas.toArray(partitionAssignment.replicas())).
+            setDirectories(Uuid.toArray(partitionAssignment.directories())).
             setIsr(Replicas.toArray(isr)).
             setLeader(isr.get(0)).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1045,19 +1050,20 @@ public class ReplicationControlManager {
                     partition,
                     topic.id,
                     partitionId,
-                    clusterControl::isActive,
+                    new LeaderAcceptor(clusterControl, partition),
                     featureControl.metadataVersion(),
                     getTopicEffectiveMinIsr(topic.name)
-                );
-                builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
-                builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+                )
+                    .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+                    .setEligibleLeaderReplicasEnabled(isElrEnabled());
                 if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
-                builder.setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs());
-                builder.setTargetLeaderRecoveryState(
-                    LeaderRecoveryState.of(partitionData.leaderRecoveryState()));
-                Optional<ApiMessageAndVersion> record = builder.build();
+                Optional<ApiMessageAndVersion> record = builder
+                    .setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs())
+                    .setTargetLeaderRecoveryState(LeaderRecoveryState.of(partitionData.leaderRecoveryState()))
+                    .setDefaultDirProvider(clusterDescriber)
+                    .build();
                 if (record.isPresent()) {
                     records.add(record.get());
                     PartitionChangeRecord change = (PartitionChangeRecord) record.get().message();
@@ -1431,18 +1437,19 @@ public class ReplicationControlManager {
         if (electionType == ElectionType.UNCLEAN) {
             election = PartitionChangeBuilder.Election.UNCLEAN;
         }
-        PartitionChangeBuilder builder = new PartitionChangeBuilder(
+        Optional<ApiMessageAndVersion> record = new PartitionChangeBuilder(
             partition,
             topicId,
             partitionId,
-            clusterControl::isActive,
+            new LeaderAcceptor(clusterControl, partition),
             featureControl.metadataVersion(),
             getTopicEffectiveMinIsr(topic)
-        );
-        builder.setElection(election)
-            .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
-        builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
-        Optional<ApiMessageAndVersion> record = builder.build();
+        )
+            .setElection(election)
+            .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+            .setEligibleLeaderReplicasEnabled(isElrEnabled())
+            .setDefaultDirProvider(clusterDescriber)
+            .build();
         if (!record.isPresent()) {
             if (electionType == ElectionType.PREFERRED) {
                 return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
@@ -1569,18 +1576,19 @@ public class ReplicationControlManager {
             }
 
             // Attempt to perform a preferred leader election
-            PartitionChangeBuilder builder = new PartitionChangeBuilder(
+            new PartitionChangeBuilder(
                 partition,
                 topicPartition.topicId(),
                 topicPartition.partitionId(),
-                clusterControl::isActive,
+                new LeaderAcceptor(clusterControl, partition),
                 featureControl.metadataVersion(),
                 getTopicEffectiveMinIsr(topic.name)
-            );
-            builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
-                .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
-            builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
-            builder.build().ifPresent(records::add);
+            )
+                .setElection(PartitionChangeBuilder.Election.PREFERRED)
+                .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+                .setEligibleLeaderReplicasEnabled(isElrEnabled())
+                .setDefaultDirProvider(clusterDescriber)
+                .build().ifPresent(records::add);
         }
 
         return ControllerResult.of(records, rescheduleImmediately);
@@ -1665,11 +1673,11 @@ public class ReplicationControlManager {
             partitionAssignments = new ArrayList<>();
             isrs = new ArrayList<>();
             for (int i = 0; i < topic.assignments().size(); i++) {
-                CreatePartitionsAssignment assignment = topic.assignments().get(i);
-                validateManualPartitionAssignment(new PartitionAssignment(assignment.brokerIds()),
-                    OptionalInt.of(replicationFactor));
-                partitionAssignments.add(new PartitionAssignment(assignment.brokerIds()));
-                List<Integer> isr = assignment.brokerIds().stream().
+                List<Integer> replicas = topic.assignments().get(i).brokerIds();
+                PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, clusterDescriber);
+                validateManualPartitionAssignment(partitionAssignment, OptionalInt.of(replicationFactor));
+                partitionAssignments.add(partitionAssignment);
+                List<Integer> isr = partitionAssignment.replicas().stream().
                     filter(clusterControl::isActive).collect(Collectors.toList());
                 if (isr.isEmpty()) {
                     throw new InvalidReplicaAssignmentException(
@@ -1688,7 +1696,6 @@ public class ReplicationControlManager {
         int partitionId = startPartitionId;
         for (int i = 0; i < partitionAssignments.size(); i++) {
             PartitionAssignment partitionAssignment = partitionAssignments.get(i);
-            List<Integer> replicas = partitionAssignment.replicas();
             List<Integer> isr = isrs.get(i).stream().
                 filter(clusterControl::isActive).collect(Collectors.toList());
             // If the ISR is empty, it means that all brokers are fenced or
@@ -1699,7 +1706,7 @@ public class ReplicationControlManager {
                     "Unable to replicate the partition " + replicationFactor +
                         " time(s): All brokers are currently fenced or in controlled shutdown.");
             }
-            records.add(buildPartitionRegistration(replicas, isr)
+            records.add(buildPartitionRegistration(partitionAssignment, isr)
                 .toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
                         setMetadataVersion(featureControl.metadataVersion()).
                         build()));
@@ -1790,7 +1797,7 @@ public class ReplicationControlManager {
                 partition,
                 topicIdPart.topicId(),
                 topicIdPart.partitionId(),
-                isAcceptableLeader,
+                new LeaderAcceptor(clusterControl, partition, isAcceptableLeader),
                 featureControl.metadataVersion(),
                 getTopicEffectiveMinIsr(topic.name)
             );
@@ -1805,7 +1812,8 @@ public class ReplicationControlManager {
             builder.setTargetIsr(Replicas.toList(
                 Replicas.copyWithout(partition.isr, brokerToRemove)));
 
-            builder.build().ifPresent(records::add);
+            builder.setDefaultDirProvider(clusterDescriber)
+                    .build().ifPresent(records::add);
         }
         if (records.size() != oldSize) {
             if (log.isDebugEnabled()) {
@@ -1905,7 +1913,7 @@ public class ReplicationControlManager {
             part,
             tp.topicId(),
             tp.partitionId(),
-            clusterControl::isActive,
+            new LeaderAcceptor(clusterControl, part),
             featureControl.metadataVersion(),
             getTopicEffectiveMinIsr(topicName)
         );
@@ -1914,11 +1922,13 @@ public class ReplicationControlManager {
         if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
         }
-        builder.setTargetIsr(revert.isr()).
+        return builder
+            .setTargetIsr(revert.isr()).
             setTargetReplicas(revert.replicas()).
             setTargetRemoving(Collections.emptyList()).
-            setTargetAdding(Collections.emptyList());
-        return builder.build();
+            setTargetAdding(Collections.emptyList()).
+            setDefaultDirProvider(clusterDescriber).
+            build();
     }
 
     /**
@@ -1953,8 +1963,8 @@ public class ReplicationControlManager {
                                                                PartitionRegistration part,
                                                                ReassignablePartition target) {
         // Check that the requested partition assignment is valid.
-        PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas));
-        PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas());
+        PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), part::directory);
+        PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), clusterDescriber);
 
         validateManualPartitionAssignment(targetAssignment, OptionalInt.empty());
 
@@ -1965,7 +1975,7 @@ public class ReplicationControlManager {
             part,
             tp.topicId(),
             tp.partitionId(),
-            clusterControl::isActive,
+            new LeaderAcceptor(clusterControl, part),
             featureControl.metadataVersion(),
             getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
         );
@@ -1980,7 +1990,7 @@ public class ReplicationControlManager {
         if (!reassignment.adding().isEmpty()) {
             builder.setTargetAdding(reassignment.adding());
         }
-        return builder.build();
+        return builder.setDefaultDirProvider(clusterDescriber).build();
     }
 
     ListPartitionReassignmentsResponseData listPartitionReassignments(
@@ -2072,4 +2082,30 @@ public class ReplicationControlManager {
             return replicaId + " (" + reason + ")";
         }
     }
+
+    private static final class LeaderAcceptor implements IntPredicate {
+        private final ClusterControlManager clusterControl;
+        private final PartitionRegistration partition;
+        private final IntPredicate isAcceptableLeader;
+
+        private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition) {
+            this(clusterControl, partition, clusterControl::isActive);
+        }
+
+        private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition, IntPredicate isAcceptableLeader) {
+            this.clusterControl = clusterControl;
+            this.partition = partition;
+            this.isAcceptableLeader = isAcceptableLeader;
+        }
+
+        @Override
+        public boolean test(int brokerId) {
+            if (!isAcceptableLeader.test(brokerId)) {
+                return false;
+            }
+            Uuid replicaDirectory = partition.directory(brokerId);
+            BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
+            return brokerRegistration.hasOnlineDir(replicaDirectory);
+        }
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 04536b69f5f..ffe95d0fd4b 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.metadata;
 
+import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
@@ -264,7 +265,7 @@ public class BrokerRegistration {
     }
 
     public boolean hasOnlineDir(Uuid dir) {
-        return Collections.binarySearch(directories, dir) >= 0;
+        return DirectoryId.isOnline(dir, directories);
     }
 
     public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 4bbc63d8cdd..076c8c3aa2d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -112,7 +112,9 @@ public class PartitionRegistration {
         public PartitionRegistration build() {
             if (replicas == null) {
                 throw new IllegalStateException("You must set replicas.");
-            } else if (directories != null && directories.length != replicas.length) {
+            } else if (directories == null) {
+                throw new IllegalStateException("You must set directories.");
+            } else if (directories.length != replicas.length) {
                 throw new IllegalStateException("The lengths for replicas and directories do not match.");
             } else if (isr == null) {
                 throw new IllegalStateException("You must set isr.");
@@ -180,9 +182,16 @@ public class PartitionRegistration {
         return record.directories();
     }
 
+    private static Uuid[] defaultToMigrating(Uuid[] directories, int numReplicas) {
+        if (directories == null || directories.length == 0) {
+            return DirectoryId.migratingArray(numReplicas);
+        }
+        return directories;
+    }
+
     public PartitionRegistration(PartitionRecord record) {
         this(Replicas.toArray(record.replicas()),
-            Uuid.toArray(checkDirectories(record)),
+            defaultToMigrating(Uuid.toArray(checkDirectories(record)), record.replicas().size()),
             Replicas.toArray(record.isr()),
             Replicas.toArray(record.removingReplicas()),
             Replicas.toArray(record.addingReplicas()),
@@ -201,7 +210,7 @@ public class PartitionRegistration {
             throw new IllegalArgumentException("The lengths for replicas and directories do not match.");
         }
         this.replicas = replicas;
-        this.directories = directories != null && directories.length > 0 ? directories : DirectoryId.unassignedArray(replicas.length);
+        this.directories = Objects.requireNonNull(directories);
         this.isr = isr;
         this.removingReplicas = removingReplicas;
         this.addingReplicas = addingReplicas;
@@ -247,7 +256,7 @@ public class PartitionRegistration {
         int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
         int[] newLastKnownElr = (record.lastKnownELR() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownELR());
         return new PartitionRegistration(newReplicas,
-            newDirectories,
+            defaultToMigrating(newDirectories, replicas.length),
             newIsr,
             newRemovingReplicas,
             newAddingReplicas,
@@ -377,7 +386,7 @@ public class PartitionRegistration {
             record.setDirectories(Uuid.toList(directories));
         } else {
             for (Uuid directory : directories) {
-                if (!DirectoryId.UNASSIGNED.equals(directory)) {
+                if (!DirectoryId.MIGRATING.equals(directory)) {
                     options.handleLoss("the directory assignment state of one or more replicas");
                     break;
                 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
index 8aaa092205e..ab0ea76cc1e 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.metadata.placement;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Iterator;
@@ -26,9 +27,14 @@ import java.util.Iterator;
  * Can describe a cluster to a ReplicaPlacer.
  */
 @InterfaceStability.Unstable
-public interface ClusterDescriber {
+public interface ClusterDescriber extends DefaultDirProvider {
     /**
      * Get an iterator through the usable brokers.
      */
     Iterator<UsableBroker> usableBrokers();
+
+    /**
+     * Get the default directory for new partitions placed in a given broker.
+     */
+    Uuid defaultDir(int brokerId);
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java
similarity index 63%
copy from metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
copy to metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java
index 8aaa092205e..79a5348bbf2 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java
@@ -17,18 +17,16 @@
 
 package org.apache.kafka.metadata.placement;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Iterator;
-
+import org.apache.kafka.common.Uuid;
 
 /**
- * Can describe a cluster to a ReplicaPlacer.
+ * Provide the default directory for new partitions in a given broker.
+ * For brokers that are registered with multiple directories, the return value
+ * should always be {@link org.apache.kafka.common.DirectoryId#UNASSIGNED}.
+ * For brokers that are registered with a single log directory, then the return
+ * value should be the ID for that directory.
  */
-@InterfaceStability.Unstable
-public interface ClusterDescriber {
-    /**
-     * Get an iterator through the usable brokers.
-     */
-    Iterator<UsableBroker> usableBrokers();
+@FunctionalInterface
+public interface DefaultDirProvider {
+    Uuid defaultDir(int brokerId);
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
index 92373df1fe4..177d5311afd 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
@@ -17,7 +17,11 @@
 
 package org.apache.kafka.metadata.placement;
 
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Uuid;
+
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -25,14 +29,28 @@ import java.util.Objects;
 /**
  * The partition assignment.
  *
- * The assignment is represented as a list of integers where each integer is the replica ID. This class is immutable.
- * It's internal state does not change.
+ * The assignment is represented as a list of integers and {@link Uuid}s
+ * where each integer is the replica ID, and each Uuid is the ID of the
+ * directory hosting the replica in the broker.
+ * This class is immutable. It's internal state does not change.
  */
 public class PartitionAssignment {
+
     private final List<Integer> replicas;
+    private final List<Uuid> directories;
 
+    // TODO remove -- just here for testing
     public PartitionAssignment(List<Integer> replicas) {
+        this(replicas, brokerId -> DirectoryId.UNASSIGNED);
+    }
+
+    public PartitionAssignment(List<Integer> replicas, DefaultDirProvider defaultDirProvider) {
         this.replicas = Collections.unmodifiableList(new ArrayList<>(replicas));
+        Uuid[] directories = new Uuid[replicas.size()];
+        for (int i = 0; i < directories.length; i++) {
+            directories[i] = defaultDirProvider.defaultDir(replicas.get(i));
+        }
+        this.directories = Collections.unmodifiableList(Arrays.asList(directories));
     }
 
     /**
@@ -42,22 +60,28 @@ public class PartitionAssignment {
         return replicas;
     }
 
+    public List<Uuid> directories() {
+        return directories;
+    }
+
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof PartitionAssignment)) return false;
-        PartitionAssignment other = (PartitionAssignment) o;
-        return replicas.equals(other.replicas);
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PartitionAssignment that = (PartitionAssignment) o;
+        return Objects.equals(replicas, that.replicas) && Objects.equals(directories, that.directories);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(replicas);
+        return Objects.hash(replicas, directories);
     }
 
     @Override
     public String toString() {
         return "PartitionAssignment" +
-            "(replicas=" + replicas +
-            ")";
+                "(replicas=" + replicas +
+                ", directories=" + directories +
+                ")";
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index f13e8ce9897..3f587a3d47e 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -438,7 +438,7 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
             placements.add(rackList.place(placement.numReplicas()));
         }
         return new TopicAssignment(
-            placements.stream().map(PartitionAssignment::new).collect(Collectors.toList())
+            placements.stream().map(replicas -> new PartitionAssignment(replicas, cluster)).collect(Collectors.toList())
         );
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 91bab7b0109..7865ced90cd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
@@ -41,8 +42,10 @@ import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metadata.placement.ClusterDescriber;
 import org.apache.kafka.metadata.placement.PartitionAssignment;
 import org.apache.kafka.metadata.placement.PlacementSpec;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -55,6 +58,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 
@@ -402,10 +406,20 @@ public class ClusterControlManagerTest {
         }
         for (int i = 0; i < 100; i++) {
             List<PartitionAssignment> results = clusterControl.replicaPlacer().place(
-                new PlacementSpec(0,
-                    1,
-                    (short) 3),
-                    clusterControl::usableBrokers
+                    new PlacementSpec(0,
+                            1,
+                            (short) 3),
+                    new ClusterDescriber() {
+                        @Override
+                        public Iterator<UsableBroker> usableBrokers() {
+                            return clusterControl.usableBrokers();
+                        }
+
+                        @Override
+                        public Uuid defaultDir(int brokerId) {
+                            return DirectoryId.UNASSIGNED;
+                        }
+                    }
             ).assignments();
             HashSet<Integer> seen = new HashSet<>();
             for (Integer result : results.get(0).replicas()) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index 8b346b1c612..2e7b91be396 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.controller.PartitionChangeBuilder.ElectionResult;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.metadata.placement.DefaultDirProvider;
 import org.apache.kafka.metadata.placement.PartitionAssignment;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -48,6 +49,7 @@ import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
 import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -56,6 +58,8 @@ import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 @Timeout(value = 40)
 public class PartitionChangeBuilderTest {
+    private static final DefaultDirProvider DEFAULT_DIR_PROVIDER = brokerId -> DirectoryId.UNASSIGNED;
+
     private static Stream<Arguments> partitionChangeRecordVersions() {
         return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version));
     }
@@ -127,13 +131,24 @@ public class PartitionChangeBuilderTest {
     }
 
     private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) {
-        return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersion, 2);
+        return new PartitionChangeBuilder(FOO,
+                FOO_ID,
+                0,
+                r -> r != 3,
+                metadataVersion,
+                2)
+                .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
     private static PartitionChangeBuilder createFooBuilder(short version) {
-        return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3,
-            metadataVersionForPartitionChangeRecordVersion(version), 2).
-                setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+        return new PartitionChangeBuilder(FOO,
+                FOO_ID,
+                0,
+                r -> r != 3,
+                metadataVersionForPartitionChangeRecordVersion(version), 
+                2).
+                setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+                setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
     private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
@@ -160,9 +175,14 @@ public class PartitionChangeBuilderTest {
     }
 
     private static PartitionChangeBuilder createBarBuilder(short version) {
-        return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3,
-            metadataVersionForPartitionChangeRecordVersion(version), 2).
-                setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+        return new PartitionChangeBuilder(BAR,
+                BAR_ID,
+                0,
+                r -> r != 3,
+                metadataVersionForPartitionChangeRecordVersion(version),
+                2).
+                setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+                setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
     private static final PartitionRegistration BAZ = new PartitionRegistration.Builder().
@@ -182,9 +202,14 @@ public class PartitionChangeBuilderTest {
     private final static Uuid BAZ_ID = Uuid.fromString("wQzt5gkSTwuQNXZF5gIw7A");
 
     private static PartitionChangeBuilder createBazBuilder(short version) {
-        return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true,
-            metadataVersionForPartitionChangeRecordVersion(version), 2).
-                setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+        return new PartitionChangeBuilder(BAZ,
+                BAZ_ID,
+                0,
+                __ -> true,
+                metadataVersionForPartitionChangeRecordVersion(version),
+                2).
+                setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+                setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
     private static final PartitionRegistration OFFLINE_WITHOUT_ELR = new PartitionRegistration.Builder().
@@ -203,6 +228,11 @@ public class PartitionChangeBuilderTest {
 
     private static final PartitionRegistration OFFLINE_WITH_ELR = new PartitionRegistration.Builder().
             setReplicas(new int[] {2, 1, 3}).
+            setDirectories(new Uuid[]{
+                    Uuid.fromString("CQEqt7trRrmqyNxUT1CY0g"),
+                    Uuid.fromString("59Mb9smoSsC0bGUP2FYV8A"),
+                    Uuid.fromString("LBTmsCVJREqJuIEtwqxRDg")
+            }).
             setElr(new int[] {3}).
             setIsr(new int[] {}).
             setLastKnownElr(new int[] {2}).
@@ -219,10 +249,14 @@ public class PartitionChangeBuilderTest {
             metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion);
         if (metadataVersion.isElrSupported()) {
             return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, r -> r == 1,
-                    metadataVersion, 2).setEligibleLeaderReplicasEnabled(true);
+                    metadataVersion, 2).
+                     setEligibleLeaderReplicasEnabled(true).
+                     setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
         } else {
             return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 0, r -> r == 1,
-                    metadataVersion, 2).setEligibleLeaderReplicasEnabled(false);
+                    metadataVersion, 2).
+                     setEligibleLeaderReplicasEnabled(false).
+                     setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
         }
     }
 
@@ -792,6 +826,7 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
         // Update ISR to {1, 2}
@@ -843,6 +878,7 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
         builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3)));
@@ -888,6 +924,7 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
         builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 4)));
@@ -939,6 +976,7 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(false);
 
         builder.setUncleanShutdownReplicas(Arrays.asList(3));
@@ -967,7 +1005,7 @@ public class PartitionChangeBuilderTest {
     }
 
     @Test
-    void testKeepsDirectoriesAfterReassignment() {
+    public void testKeepsDirectoriesAfterReassignment() {
         PartitionRegistration registration = new PartitionRegistration.Builder().
                 setReplicas(new int[]{2, 1, 3}).
                 setDirectories(new Uuid[]{
@@ -983,16 +1021,20 @@ public class PartitionChangeBuilderTest {
                 build();
         Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
                 0, r -> true, MetadataVersion.IBP_3_7_IV2, 2).
-                setTargetReplicas(Arrays.asList(3, 1, 4)).build();
+                setTargetReplicas(Arrays.asList(3, 1, 5, 4)).
+                setDirectory(5, Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg")).
+                setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
+                build();
         Optional<ApiMessageAndVersion> expected = Optional.of(new ApiMessageAndVersion(
                 new PartitionChangeRecord().
                         setTopicId(FOO_ID).
                         setPartitionId(0).
                         setLeader(1).
-                        setReplicas(Arrays.asList(3, 1, 4)).
+                        setReplicas(Arrays.asList(3, 1, 5, 4)).
                         setDirectories(Arrays.asList(
                                 Uuid.fromString("fM5NKyWTQHqEihjIkUl99Q"),
                                 Uuid.fromString("iU2znv45Q9yQkOpkTSy3jA"),
+                                Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg"),
                                 DirectoryId.UNASSIGNED
                         )),
                 (short) 1
@@ -1000,12 +1042,48 @@ public class PartitionChangeBuilderTest {
         assertEquals(expected, built);
     }
 
+    @Test
+    public void testUpdateDirectories() {
+        PartitionRegistration registration = new PartitionRegistration.Builder().
+                setReplicas(new int[]{2, 1, 3}).
+                setDirectories(new Uuid[]{
+                        Uuid.fromString("S1zMYZczRjWmucidLqGA5g"),
+                        Uuid.fromString("9eRNXTvFTsWUJObvW51V5A"),
+                        Uuid.fromString("UpePYVBgRAi3c4ujQrf3Kg")
+                }).
+                setIsr(new int[]{2, 1, 3}).
+                setLeader(2).
+                setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+                setLeaderEpoch(100).
+                setPartitionEpoch(200).
+                build();
+        Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
+                0, r -> true, withDirectoryAssignmentSupport(MetadataVersion.latest()), 2).
+                setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")).
+                setDirectory(1, DirectoryId.LOST).
+                setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
+                build();
+        Optional<ApiMessageAndVersion> expected = Optional.of(new ApiMessageAndVersion(
+                new PartitionChangeRecord().
+                        setTopicId(FOO_ID).
+                        setPartitionId(0).
+                        setDirectories(Arrays.asList(
+                                Uuid.fromString("S1zMYZczRjWmucidLqGA5g"),
+                                DirectoryId.LOST,
+                                Uuid.fromString("pN1VKs9zRzK4APflpegAVg")
+                        )),
+                (short) 2
+        ));
+        assertEquals(expected, built);
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEnabled) {
         short version = 2;
         PartitionRegistration partition = new PartitionRegistration.Builder()
             .setReplicas(new int[] {1, 2, 3, 4})
+            .setDirectories(DirectoryId.migratingArray(4))
             .setIsr(new int[] {1})
             .setElr(new int[] {3})
             .setLastKnownElr(lastKnownLeaderEnabled ? new int[] {} : new int[] {2})
@@ -1021,7 +1099,8 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setEligibleLeaderReplicasEnabled(isElrEnabled(version))
-            .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
+            .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled)
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
 
         builder.setTargetIsr(Collections.emptyList());
 
@@ -1048,6 +1127,12 @@ public class PartitionChangeBuilderTest {
         short version = 2;
         PartitionRegistration partition = new PartitionRegistration.Builder()
             .setReplicas(new int[] {1, 2, 3, 4})
+            .setDirectories(new Uuid[]{
+                    Uuid.fromString("MrTKKPEpRv66ZpWv4V7EBQ"),
+                    Uuid.fromString("CkvgdEcWTVmdhfNuJXL0xA"),
+                    Uuid.fromString("4a2coMsPRkSCsiTVWSksSw"),
+                    Uuid.fromString("tmPdVjzASZ2ZqiS0cVJvtQ")
+            })
             .setIsr(new int[] {1, 2, 3, 4})
             .setElr(new int[] {})
             .setLastKnownElr(new int[] {})
@@ -1063,6 +1148,7 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setEligibleLeaderReplicasEnabled(true)
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
 
         builder.setTargetIsr(Collections.emptyList());
@@ -1090,6 +1176,7 @@ public class PartitionChangeBuilderTest {
                 .setElection(Election.PREFERRED)
                 .setEligibleLeaderReplicasEnabled(true)
                 .setUncleanShutdownReplicas(Arrays.asList(2))
+                .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
                 .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
             PartitionChangeRecord changeRecord = (PartitionChangeRecord) builder.build().get().message();
             assertTrue(changeRecord.lastKnownELR() == null, changeRecord.toString());
@@ -1104,6 +1191,7 @@ public class PartitionChangeBuilderTest {
         short version = 2;
         PartitionRegistration partition = new PartitionRegistration.Builder()
                 .setReplicas(new int[] {1, 2, 3, 4})
+                .setDirectories(DirectoryId.migratingArray(4))
                 .setIsr(new int[] {})
                 .setElr(new int[] {})
                 .setLastKnownElr(new int[] {1})
@@ -1118,6 +1206,7 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setUseLastKnownLeaderInBalancedRecovery(true)
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setEligibleLeaderReplicasEnabled(true);
 
         builder.setTargetIsr(Collections.emptyList());
@@ -1144,6 +1233,12 @@ public class PartitionChangeBuilderTest {
         short version = 2;
         PartitionRegistration partition = new PartitionRegistration.Builder()
                 .setReplicas(new int[] {1, 2, 3, 4})
+                .setDirectories(new Uuid[]{
+                        Uuid.fromString("zANDdMukTEqefOvHpmniMg"),
+                        Uuid.fromString("Ui2Eq8rbRiuW7m7uiPTRyg"),
+                        Uuid.fromString("MhgJOZrrTsKNcGM0XKK4aA"),
+                        Uuid.fromString("Y25PaCAmRfyGIKxAThhBAw")
+                })
                 .setIsr(new int[] {})
                 .setElr(new int[] {3})
                 .setLastKnownElr(new int[] {1})
@@ -1158,6 +1253,7 @@ public class PartitionChangeBuilderTest {
                 metadataVersionForPartitionChangeRecordVersion(version), 3)
             .setElection(Election.PREFERRED)
             .setEligibleLeaderReplicasEnabled(true)
+            .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
             .setUseLastKnownLeaderInBalancedRecovery(true);
 
         builder.setTargetIsr(Collections.emptyList());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
index a488e50639d..dd1d567e587 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Optional;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.placement.PartitionAssignment;
@@ -134,6 +135,11 @@ public class PartitionReassignmentReplicasTest {
         assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
             new PartitionRegistration.Builder().
                 setReplicas(new int[]{0, 1, 3, 2}).
+                setDirectories(new Uuid[]{
+                    Uuid.fromString("HEKOeWDdQOqr2cmHrnjqjA"),
+                    Uuid.fromString("I8kmmcM5TjOwNFnGvJLCjA"),
+                    Uuid.fromString("x8osEoRkQdupZNYpU5c3Lw"),
+                    Uuid.fromString("OT6qgtRqTiuiX8EikvAVow")}).
                 setIsr(new int[]{0, 1, 3, 2}).
                 setRemovingReplicas(new int[]{2}).
                 setAddingReplicas(new int[]{3}).
@@ -145,6 +151,12 @@ public class PartitionReassignmentReplicasTest {
         assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
             new PartitionRegistration.Builder().
                 setReplicas(new int[]{0, 1, 3, 2}).
+                setDirectories(new Uuid[]{
+                    Uuid.fromString("QrbOddSYQg6JgFu7hLvOTg"),
+                    Uuid.fromString("S585FNNoSmiSH6ZYCrNqCg"),
+                    Uuid.fromString("wjT5ieLARfKYMWIzTFwcag"),
+                    Uuid.fromString("qzX9qWPVTWuLbiEQL0cgeg")
+                }).
                 setIsr(new int[]{0, 1, 3, 2}).
                 setRemovingReplicas(new int[]{2}).
                 setLeader(0).
@@ -155,6 +167,12 @@ public class PartitionReassignmentReplicasTest {
         assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
             new PartitionRegistration.Builder().
                 setReplicas(new int[]{0, 1, 3, 2}).
+                setDirectories(new Uuid[]{
+                    Uuid.fromString("QIyJnfdUSz6laFLCgj3AjA"),
+                    Uuid.fromString("1QIvvBx2QVqNw2dsnYXUZg"),
+                    Uuid.fromString("yPvPnGrxR0q8KC2Q5k0FIg"),
+                    Uuid.fromString("a0lnxzleTcWVf1IyalE9cA")
+                }).
                 setIsr(new int[]{0, 1, 3, 2}).
                 setAddingReplicas(new int[]{3}).
                 setLeader(0).
@@ -165,6 +183,11 @@ public class PartitionReassignmentReplicasTest {
         assertFalse(PartitionReassignmentReplicas.isReassignmentInProgress(
             new PartitionRegistration.Builder().
                 setReplicas(new int[]{0, 1, 2}).
+                setDirectories(new Uuid[]{
+                    Uuid.fromString("I4qCCBe9TYGOB0xvmvTI7w"),
+                    Uuid.fromString("JvzGem0nTxiNPM5jIzNzlA"),
+                    Uuid.fromString("EfWjZ2EsSKSvEn9PkG7lWQ")
+                }).
                 setIsr(new int[]{0, 1, 2}).
                 setLeader(0).
                 setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
index 69a617c35b9..05148813e81 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
 
 import java.util.Arrays;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.junit.jupiter.api.Test;
@@ -35,6 +36,11 @@ public class PartitionReassignmentRevertTest {
     public void testNoneAddedOrRemoved() {
         PartitionRegistration registration = new PartitionRegistration.Builder().
             setReplicas(new int[] {3, 2, 1}).setIsr(new int[] {3, 2}).
+            setDirectories(new Uuid[]{
+                    Uuid.fromString("Qln01zZAQMKzFTRCw22Y4w"),
+                    Uuid.fromString("jjUcnIL2TxWEGMZ1mHvkPA"),
+                    Uuid.fromString("JSZNFA0uQFmH1N75hQxWug")
+            }).
             setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
         PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
         assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -46,6 +52,11 @@ public class PartitionReassignmentRevertTest {
     public void testSomeRemoving() {
         PartitionRegistration registration = new PartitionRegistration.Builder().
             setReplicas(new int[] {3, 2, 1}).setIsr(new int[] {3, 2}).
+            setDirectories(new Uuid[]{
+                    Uuid.fromString("WG58zlb5RR6TqdI81SCXeA"),
+                    Uuid.fromString("izoB1H6TQdOExQ4XNMNXeQ"),
+                    Uuid.fromString("TluNaJDjRemuy17sO7dDKg")
+            }).
             setRemovingReplicas(new int[]{2, 1}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
         PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
         assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -57,6 +68,13 @@ public class PartitionReassignmentRevertTest {
     public void testSomeAdding() {
         PartitionRegistration registration = new PartitionRegistration.Builder().
             setReplicas(new int[] {4, 5, 3, 2, 1}).setIsr(new int[] {4, 5, 2}).
+            setDirectories(new Uuid[]{
+                    Uuid.fromString("gR9P3R9NQ5GhtewattwuZw"),
+                    Uuid.fromString("vzgieGUjSr6vPvl3ZAWQcg"),
+                    Uuid.fromString("8UWF5CQfQDqSyzcChmbrgw"),
+                    Uuid.fromString("X3J9b4K5TumAM5a3YOKk2w"),
+                    Uuid.fromString("LjZGhHfFRSCSCQw42dLlNA")
+            }).
             setAddingReplicas(new int[]{4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
         PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
         assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -68,6 +86,13 @@ public class PartitionReassignmentRevertTest {
     public void testSomeRemovingAndAdding() {
         PartitionRegistration registration = new PartitionRegistration.Builder().
             setReplicas(new int[] {4, 5, 3, 2, 1}).setIsr(new int[] {4, 5, 2}).
+            setDirectories(new Uuid[]{
+                Uuid.fromString("IHR5DKGdQju05pbDpwfdbA"),
+                Uuid.fromString("9zsVmGReTDOAyuPEtp58Cw"),
+                Uuid.fromString("bsUouEfRSLi50Pj3nqke2A"),
+                Uuid.fromString("8l9R5BMcQZGbICOXPmxZNw"),
+                Uuid.fromString("3n5Gwv8jRMiIFMgoTxVCdA")
+            }).
             setRemovingReplicas(new int[]{2}).setAddingReplicas(new int[]{4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
         PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
         assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -79,6 +104,13 @@ public class PartitionReassignmentRevertTest {
     public void testIsrSpecialCase() {
         PartitionRegistration registration = new PartitionRegistration.Builder().
             setReplicas(new int[] {4, 5, 3, 2, 1}).setIsr(new int[] {4, 5}).
+            setDirectories(new Uuid[]{
+                    Uuid.fromString("1oXnuHL6T8y7yMtEP4FSdg"),
+                    Uuid.fromString("3ddu6izxT2aCpQrA3C2bjw"),
+                    Uuid.fromString("WvCpqmaZTaSbifd43Vl2Xg"),
+                    Uuid.fromString("n0cmj6NgTaahRMwa75FnRA"),
+                    Uuid.fromString("S2mDcyiAQAe92ZrlyodaDw")
+            }).
             setRemovingReplicas(new int[]{2}).setAddingReplicas(new int[]{4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
         PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
         assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 8ecdafe65ea..82dabed8bd6 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -141,6 +141,7 @@ import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXC
 import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
 import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor;
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -345,15 +346,31 @@ public class ReplicationControlManagerTest {
         }
 
         void registerBrokers(Integer... brokerIds) throws Exception {
-            for (int brokerId : brokerIds) {
+            Object[] brokersAndDirs = new Object[brokerIds.length * 2];
+            for (int i = 0; i < brokerIds.length; i++) {
+                brokersAndDirs[i * 2] = brokerIds[i];
+                brokersAndDirs[i * 2 + 1] = Collections.singletonList(DirectoryId.UNASSIGNED);
+            }
+            registerBrokersWithDirs(brokersAndDirs);
+        }
+
+        @SuppressWarnings("unchecked")
+        void registerBrokersWithDirs(Object... brokerIdsAndDirs) throws Exception {
+            if (brokerIdsAndDirs.length % 2 != 0) {
+                throw new IllegalArgumentException("uneven number of arguments");
+            }
+            for (int i = 0; i < brokerIdsAndDirs.length / 2; i++) {
+                int brokerId = (int) brokerIdsAndDirs[i * 2];
+                List<Uuid> logDirs = (List<Uuid>) brokerIdsAndDirs[i * 2 + 1];
                 RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-                    setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId).setRack(null);
+                    setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId).
+                        setRack(null).setLogDirs(logDirs);
                 brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
                     setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
                     setPort((short) 9092 + brokerId).
                     setName("PLAINTEXT").
                     setHost("localhost"));
-                replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 0)));
+                replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 3)));
             }
         }
 
@@ -508,7 +525,9 @@ public class ReplicationControlManagerTest {
 
     @Test
     public void testCreateTopics() throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+                .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+                .build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
         request.topics().add(new CreatableTopic().setName("foo").
@@ -550,6 +569,7 @@ public class ReplicationControlManagerTest {
         assertEquals(expectedResponse3, result3.response());
         ctx.replay(result3.records());
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}).
+            setDirectories(DirectoryId.migratingArray(3)).
             setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(),
             replicationControl.getPartition(
                 ((TopicRecord) result3.records().get(0).message()).topicId(), 0));
@@ -584,7 +604,9 @@ public class ReplicationControlManagerTest {
 
     @Test
     public void testCreateTopicsISRInvariants() throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+                .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+                .build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
 
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -612,6 +634,7 @@ public class ReplicationControlManagerTest {
         // cannot be in the ISR because it is in controlled shutdown.
         assertEquals(
             new PartitionRegistration.Builder().setReplicas(new int[]{1, 0, 2}).
+                setDirectories(DirectoryId.migratingArray(3)).
                 setIsr(new int[]{0}).
                 setLeader(0).
                 setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1515,6 +1538,7 @@ public class ReplicationControlManagerTest {
         // cannot be in the ISR because it is in controlled shutdown.
         assertEquals(
             new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2}).
+                setDirectories(DirectoryId.migratingArray(3)).
                 setIsr(new int[]{0}).
                 setLeader(0).
                 setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1568,7 +1592,10 @@ public class ReplicationControlManagerTest {
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
     public void testReassignPartitions(short version) throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+        MetadataVersion metadataVersion = withDirectoryAssignmentSupport(MetadataVersion.latest());
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+                .setMetadataVersion(metadataVersion)
+                .build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3);
         ctx.unfenceBrokers(0, 1, 2, 3);
@@ -1634,9 +1661,9 @@ public class ReplicationControlManagerTest {
             new PartitionChangeRecord().setTopicId(fooId).
                 setPartitionId(1).
                 setReplicas(asList(2, 1, 3)).
+                setDirectories(asList(DirectoryId.migratingArray(3))).
                 setLeader(3).
                 setRemovingReplicas(Collections.emptyList()).
-                setDirectories(Arrays.asList(DirectoryId.unassignedArray(3))).
                 setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
             new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
                 new ReassignableTopicResponse().setName("foo").setPartitions(asList(
@@ -1686,7 +1713,9 @@ public class ReplicationControlManagerTest {
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
     public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+                .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+                .build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
         ctx.unfenceBrokers(0, 1, 2, 3, 4);
@@ -1702,6 +1731,7 @@ public class ReplicationControlManagerTest {
         assertEquals(
             new PartitionRegistration.Builder().
                 setReplicas(new int[] {1, 2, 3, 4}).
+                setDirectories(DirectoryId.migratingArray(4)).
                 setIsr(new int[] {1, 2, 4}).
                 setLeader(1).
                 setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1832,7 +1862,9 @@ public class ReplicationControlManagerTest {
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
     public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+                .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+                .build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
         ctx.unfenceBrokers(0, 1, 2, 3, 4);
@@ -1844,6 +1876,7 @@ public class ReplicationControlManagerTest {
         assertEquals(
             new PartitionRegistration.Builder().
                 setReplicas(new int[] {1, 2, 3, 4}).
+                setDirectories(DirectoryId.migratingArray(4)).
                 setIsr(new int[] {1, 2, 3, 4}).
                 setLeader(1).
                 setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1886,7 +1919,10 @@ public class ReplicationControlManagerTest {
 
     @Test
     public void testCancelReassignPartitions() throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+        MetadataVersion metadataVersion = withDirectoryAssignmentSupport(MetadataVersion.latest());
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+                .setMetadataVersion(metadataVersion)
+                .build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
         ctx.unfenceBrokers(0, 1, 2, 3, 4);
@@ -1900,6 +1936,7 @@ public class ReplicationControlManagerTest {
         replication.handleBrokerFenced(3, fenceRecords);
         ctx.replay(fenceRecords);
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}).setIsr(new int[] {1, 2, 4}).
+            setDirectories(DirectoryId.migratingArray(4)).
             setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build(), replication.getPartition(fooId, 0));
         ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
             replication.alterPartitionReassignments(
@@ -1937,10 +1974,13 @@ public class ReplicationControlManagerTest {
             alterResult.response());
         ctx.replay(alterResult.records());
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2}).
+            setDirectories(DirectoryId.migratingArray(3)).
             setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 0}).setIsr(new int[] {0, 1, 2}).
+            setDirectories(DirectoryId.migratingArray(4)).
             setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(fooId, 1));
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 0}).setIsr(new int[] {4, 2}).
+            setDirectories(DirectoryId.migratingArray(5)).
             setAddingReplicas(new int[] {0, 1}).setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(barId, 0));
         ListPartitionReassignmentsResponseData currentReassigning =
             new ListPartitionReassignmentsResponseData().setErrorMessage(null).
@@ -1987,8 +2027,8 @@ public class ReplicationControlManagerTest {
                     setPartitionId(0).
                     setLeader(4).
                     setReplicas(asList(2, 3, 4)).
+                    setDirectories(asList(DirectoryId.migratingArray(3))).
                     setRemovingReplicas(null).
-                    setDirectories(Arrays.asList(DirectoryId.unassignedArray(3))).
                     setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
             new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
                 new ReassignableTopicResponse().setName("foo").setPartitions(asList(
@@ -2001,6 +2041,7 @@ public class ReplicationControlManagerTest {
         ctx.replay(cancelResult.records());
         assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {2, 3, 4}).setIsr(new int[] {4, 2}).
+            setDirectories(DirectoryId.migratingArray(3)).
             setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(3).build(), replication.getPartition(barId, 0));
     }
 
@@ -2014,7 +2055,9 @@ public class ReplicationControlManagerTest {
 
     @Test
     public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+                .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+                .build();
         ctx.registerBrokers(0, 1, 2, 3, 4, 5);
         ctx.unfenceBrokers(0, 1, 2);
         Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId();
@@ -2022,6 +2065,7 @@ public class ReplicationControlManagerTest {
             INVALID_REPLICA_ASSIGNMENT.code());
         ctx.createPartitions(2, "foo", new int[][] {new int[] {2, 4, 5}}, NONE.code());
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {2, 4, 5}).
+                setDirectories(DirectoryId.migratingArray(3)).
                 setIsr(new int[] {2}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(),
             ctx.replicationControl.getPartition(fooId, 1));
     }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
index 1f6adf93c57..b7c28bab183 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
@@ -47,6 +47,7 @@ import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.Fak
 import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakePartitionRegistration;
 import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakeTopicImage;
 import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakeTopicsImage;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ControllerMetadataMetricsPublisherTest {
@@ -145,7 +146,7 @@ public class ControllerMetadataMetricsPublisherTest {
             MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
             ImageReWriter writer = new ImageReWriter(delta);
             IMAGE1.write(writer, new ImageWriterOptions.Builder().
-                    setMetadataVersion(delta.image().features().metadataVersion()).
+                    setMetadataVersion(withDirectoryAssignmentSupport(delta.image().features().metadataVersion())).
                     build());
             env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true));
             assertEquals(0, env.metrics.activeBrokerCount());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
index eaf789619d1..93473abed1a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
@@ -36,6 +36,7 @@ import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.Fak
 import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER;
 import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.OFFLINE;
 import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakePartitionRegistration;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ControllerMetricsChangesTest {
@@ -156,7 +157,7 @@ public class ControllerMetricsChangesTest {
 
     static {
         ImageWriterOptions options = new ImageWriterOptions.Builder().
-                setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build(); // highest MV for PartitionRecord v0
+                setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.IBP_3_7_IV0)).build(); // highest MV for PartitionRecord v0
         TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap()));
         TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
                 toRecord(FOO_ID, 0, options).message());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
index 96c51471188..a0e3a5cd86a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.controller.metrics;
 
 import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsImage;
@@ -73,6 +74,7 @@ public class ControllerMetricsTestUtils {
         }
         return new PartitionRegistration.Builder().
             setReplicas(new int[] {0, 1, 2}).
+            setDirectories(DirectoryId.migratingArray(3)).
             setIsr(new int[] {0, 1, 2}).
             setLeader(leader).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
diff --git a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
index 213e3c63393..fc228bb5c84 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
@@ -38,9 +38,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
 
 
 @Timeout(value = 40)
@@ -133,8 +132,7 @@ public class ImageDowngradeTest {
     @Test
     void testDirectoryAssignmentState() {
         MetadataVersion outputMetadataVersion = MetadataVersion.IBP_3_7_IV0;
-        MetadataVersion inputMetadataVersion = spy(outputMetadataVersion); // TODO replace with actual MV after bump for KIP-858
-        when(inputMetadataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
+        MetadataVersion inputMetadataVersion = withDirectoryAssignmentSupport(outputMetadataVersion);
         PartitionRecord testPartitionRecord = (PartitionRecord) TEST_RECORDS.get(1).message();
         writeWithExpectedLosses(outputMetadataVersion,
                 Collections.singletonList("the directory assignment state of one or more replicas"),
diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index 0a0d97dd558..db366b0281d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Timeout;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 
@@ -114,7 +115,7 @@ public class MetadataImageTest {
 
     private static void testToImage(MetadataImage image) {
         testToImage(image, new ImageWriterOptions.Builder()
-            .setMetadataVersion(image.features().metadataVersion())
+            .setMetadataVersion(withDirectoryAssignmentSupport(image.features().metadataVersion()))
             .build(), Optional.empty());
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index d9bf0876714..9ef72125b66 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -28,6 +29,7 @@ import org.apache.kafka.image.writer.RecordListWriter;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.immutable.ImmutableMap;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
@@ -47,6 +49,7 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHAN
 import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -101,13 +104,17 @@ public class TopicsImageTest {
         TOPIC_IMAGES1 = Arrays.asList(
             newTopicImage("foo", FOO_UUID,
                 new PartitionRegistration.Builder().setReplicas(new int[] {2, 3, 4}).
+                    setDirectories(DirectoryId.migratingArray(3)).
                     setIsr(new int[] {2, 3}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(345).build(),
                 new PartitionRegistration.Builder().setReplicas(new int[] {3, 4, 5}).
+                        setDirectories(DirectoryId.migratingArray(3)).
                     setIsr(new int[] {3, 4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(4).setPartitionEpoch(684).build(),
                 new PartitionRegistration.Builder().setReplicas(new int[] {2, 4, 5}).
+                        setDirectories(DirectoryId.migratingArray(3)).
                     setIsr(new int[] {2, 4, 5}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(10).setPartitionEpoch(84).build()),
             newTopicImage("bar", BAR_UUID,
                 new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2, 3, 4}).
+                    setDirectories(DirectoryId.migratingArray(5)).
                     setIsr(new int[] {0, 1, 2, 3}).setRemovingReplicas(new int[] {1}).setAddingReplicas(new int[] {3, 4}).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(345).build()));
 
         IMAGE1 = new TopicsImage(newTopicsByIdMap(TOPIC_IMAGES1), newTopicsByNameMap(TOPIC_IMAGES1));
@@ -140,9 +147,11 @@ public class TopicsImageTest {
         List<TopicImage> topics2 = Arrays.asList(
             newTopicImage("bar", BAR_UUID,
                 new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2, 3, 4}).
+                    setDirectories(DirectoryId.migratingArray(5)).
                     setIsr(new int[] {0, 1, 2, 3}).setRemovingReplicas(new int[] {1}).setAddingReplicas(new int[] {3, 4}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(2).setPartitionEpoch(346).build()),
             newTopicImage("baz", BAZ_UUID,
                 new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}).
+                    setDirectories(DirectoryId.migratingArray(4)).
                     setIsr(new int[] {3, 4}).setRemovingReplicas(new int[] {2}).setAddingReplicas(new int[] {1}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(2).setPartitionEpoch(1).build()));
         IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
     }
@@ -162,8 +171,13 @@ public class TopicsImageTest {
     }
 
     private PartitionRegistration newPartition(int[] replicas) {
+        Uuid[] directories = new Uuid[replicas.length];
+        for (int i = 0; i < replicas.length; i++) {
+            directories[i] = DirectoryId.random();
+        }
         return new PartitionRegistration.Builder().
             setReplicas(replicas).
+            setDirectories(directories).
             setIsr(replicas).
             setLeader(replicas[0]).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -426,7 +440,9 @@ public class TopicsImageTest {
 
     private static List<ApiMessageAndVersion> getImageRecords(TopicsImage image) {
         RecordListWriter writer = new RecordListWriter();
-        image.write(writer, new ImageWriterOptions.Builder().build());
+        image.write(writer, new ImageWriterOptions.Builder().
+                setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest())).
+                build());
         return writer.records();
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index f4494a72b01..78b7ff899fc 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -36,12 +36,11 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
 
 @Timeout(value = 40)
 public class BrokerRegistrationTest {
@@ -144,10 +143,8 @@ public class BrokerRegistrationTest {
     }
 
     private void testRoundTrip(BrokerRegistration registration) {
-        MetadataVersion metdataVersion = spy(MetadataVersion.latest()); // TODO replace with actual MV after bump for KIP-858
-        when(metdataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
         ImageWriterOptions options = new ImageWriterOptions.Builder().
-                setMetadataVersion(metdataVersion).
+                setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest())).
                 build();
         ApiMessageAndVersion messageAndVersion = registration.
             toRecord(options);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index b92a9f62221..3c8a402aa05 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -64,11 +65,14 @@ public class PartitionRegistrationTest {
     @Test
     public void testPartitionControlInfoMergeAndDiff() {
         PartitionRegistration a = new PartitionRegistration.Builder().
-            setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
+            setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)).
+            setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
         PartitionRegistration b = new PartitionRegistration.Builder().
-            setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{3}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(1).build();
+            setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)).
+            setIsr(new int[]{3}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(1).build();
         PartitionRegistration c = new PartitionRegistration.Builder().
-            setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1}).setLastKnownElr(new int[]{3}).setElr(new int[]{2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
+            setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)).
+            setIsr(new int[]{1}).setLastKnownElr(new int[]{3}).setElr(new int[]{2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
         assertEquals(b, a.merge(new PartitionChangeRecord().
             setLeader(3).setIsr(Arrays.asList(3))));
         assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, partitionEpoch: 0 -> 1",
@@ -80,7 +84,9 @@ public class PartitionRegistrationTest {
     @Test
     public void testRecordRoundTrip() {
         PartitionRegistration registrationA = new PartitionRegistration.Builder().
-            setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setRemovingReplicas(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
+            setReplicas(new int[]{1, 2, 3}).
+            setDirectories(DirectoryId.migratingArray(3)).
+            setIsr(new int[]{1, 2}).setRemovingReplicas(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
         Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
         int partitionId = 4;
         ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
@@ -93,9 +99,21 @@ public class PartitionRegistrationTest {
     @Test
     public void testToLeaderAndIsrPartitionState() {
         PartitionRegistration a = new PartitionRegistration.Builder().
-            setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(123).setPartitionEpoch(456).build();
+            setReplicas(new int[]{1, 2, 3}).
+                setDirectories(new Uuid[]{
+                    Uuid.fromString("NSmkU0ieQuy2IHN59Ce0Bw"),
+                    Uuid.fromString("Y8N9gnSKSLKKFCioX2laGA"),
+                    Uuid.fromString("Oi7nvb8KQPyaGEqr4JtCRw")
+                }).
+            setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(123).setPartitionEpoch(456).build();
         PartitionRegistration b = new PartitionRegistration.Builder().
-            setReplicas(new int[]{2, 3, 4}).setIsr(new int[]{2, 3, 4}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(234).setPartitionEpoch(567).build();
+            setReplicas(new int[]{2, 3, 4}).
+                setDirectories(new Uuid[]{
+                    Uuid.fromString("tAn3q03aQAWEYkNajXm3lA"),
+                    Uuid.fromString("zgj8rqatTmWMyWBsRZyiVg"),
+                    Uuid.fromString("bAAlGAz1TN2doZjtWlvhRQ")
+                }).
+            setIsr(new int[]{2, 3, 4}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(234).setPartitionEpoch(567).build();
         assertEquals(new LeaderAndIsrPartitionState().
                 setTopicName("foo").
                 setPartitionIndex(1).
@@ -159,7 +177,7 @@ public class PartitionRegistrationTest {
     @Test
     public void testBuilderThrowsIllegalStateExceptionWhenMissingIsr() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
-            setReplicas(new int[]{0});
+            setReplicas(new int[]{0}).setDirectories(new Uuid[]{DirectoryId.UNASSIGNED});
         IllegalStateException exception = assertThrows(IllegalStateException.class, () -> builder.build());
         assertEquals("You must set isr.", exception.getMessage());
     }
@@ -168,6 +186,7 @@ public class PartitionRegistrationTest {
     public void testBuilderThrowsIllegalStateExceptionWhenMissingLeader() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
             setReplicas(new int[]{0}).
+            setDirectories(new Uuid[]{DirectoryId.LOST}).
             setIsr(new int[]{0}).
             setRemovingReplicas(new int[]{0}).
             setAddingReplicas(new int[]{0});
@@ -180,6 +199,7 @@ public class PartitionRegistrationTest {
     public void testBuilderThrowsIllegalStateExceptionWhenMissingLeaderRecoveryState() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
             setReplicas(new int[]{0}).
+            setDirectories(new Uuid[]{DirectoryId.MIGRATING}).
             setIsr(new int[]{0}).
             setRemovingReplicas(new int[]{0}).
             setAddingReplicas(new int[]{0}).
@@ -192,6 +212,7 @@ public class PartitionRegistrationTest {
     public void testBuilderThrowsIllegalStateExceptionWhenMissingLeaderEpoch() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
             setReplicas(new int[]{0}).
+            setDirectories(new Uuid[]{Uuid.fromString("OP4I696sRmCPanlNidxJYw")}).
             setIsr(new int[]{0}).
             setRemovingReplicas(new int[]{0}).
             setAddingReplicas(new int[]{0}).
@@ -205,6 +226,7 @@ public class PartitionRegistrationTest {
     public void testBuilderThrowsIllegalStateExceptionWhenMissingPartitionEpoch() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
             setReplicas(new int[]{0}).
+            setDirectories(DirectoryId.migratingArray(1)).
             setIsr(new int[]{0}).
             setRemovingReplicas(new int[]{0}).
             setAddingReplicas(new int[]{0}).
@@ -219,6 +241,7 @@ public class PartitionRegistrationTest {
     public void testBuilderSuccess() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
             setReplicas(new int[]{0, 1, 2}).
+            setDirectories(DirectoryId.unassignedArray(3)).
             setIsr(new int[]{0, 1}).
             setElr(new int[]{2}).
             setLastKnownElr(new int[]{0, 1, 2}).
@@ -245,6 +268,7 @@ public class PartitionRegistrationTest {
     public void testBuilderSetsDefaultAddingAndRemovingReplicas() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
             setReplicas(new int[]{0, 1}).
+            setDirectories(DirectoryId.migratingArray(2)).
             setIsr(new int[]{0, 1}).
             setLeader(0).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -325,6 +349,7 @@ public class PartitionRegistrationTest {
     public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
         PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
             setReplicas(new int[]{0, 1, 2, 3, 4}).
+            setDirectories(DirectoryId.migratingArray(5)).
             setIsr(new int[]{0, 1}).
             setLeader(0).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -340,7 +365,7 @@ public class PartitionRegistrationTest {
             setLeader(0).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
             setLeaderEpoch(0).
-            setDirectories(Arrays.asList(DirectoryId.unassignedArray(5))).
+            setDirectories(Arrays.asList(DirectoryId.migratingArray(5))).
             setPartitionEpoch(0);
         List<UnwritableMetadataException> exceptions = new ArrayList<>();
         ImageWriterOptions options = new ImageWriterOptions.Builder().
@@ -370,25 +395,65 @@ public class PartitionRegistrationTest {
     Arbitrary<PartitionRegistration> uniqueSamples() {
         return Arbitraries.of(
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+                setDirectories(new Uuid[]{Uuid.fromString("HyTsxr8hT6Gq5heZMA2Bug"), Uuid.fromString("ePwTiSgFRvaKRBaUX3EcZQ"), Uuid.fromString("F3zwSDR1QWGKNNLMowVoYg")}).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+                setDirectories(new Uuid[]{Uuid.fromString("94alcrMLQ6GOV8EHfAxJnA"), Uuid.fromString("LlD2QCA5RpalzKwPsUTGpw"), Uuid.fromString("Ahfjx9j5SIKpmz48pTLFRg")}).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(101).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+                setDirectories(new Uuid[]{Uuid.fromString("KcXLjTpYSPGjM20DjHd5rA"), Uuid.fromString("NXiBSMNHSvWqvz3qM8a6Vg"), Uuid.fromString("yWinzh1DRD25nHuXUxLfBQ")}).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).setElr(new int[] {1, 2}).setLastKnownElr(new int[] {1, 2}).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+                setDirectories(new Uuid[]{Uuid.fromString("9bDLWtoRRaKUToKixl3NUg"), Uuid.fromString("nLJMwhSUTEOU7DEI0U2GOw"), Uuid.fromString("ULAltTBAQlG2peJh9DZZrw")}).
                 setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1}).
+                setDirectories(new Uuid[]{Uuid.fromString("kWM0QcMoRg6BHc7sdVsjZg"), Uuid.fromString("84F4VbPGTRWewKhlCYctbQ"), Uuid.fromString("W505iUM0S6a5Ds83d1WjcQ")}).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING).setLeaderEpoch(100).setPartitionEpoch(200).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {4, 5, 6}).setAddingReplicas(new int[] {1, 2, 3}).
+                setDirectories(DirectoryId.unassignedArray(6)).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
+                setDirectories(DirectoryId.migratingArray(6)).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 3}).
+                setDirectories(DirectoryId.unassignedArray(6)).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
             new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
+                setDirectories(DirectoryId.migratingArray(6)).
                 setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {2, 3}).setLastKnownElr(new int[] {1, 2}).build()
         );
 
     }
 
+    @Test
+    public void testDirectories() {
+        PartitionRegistration partitionRegistration = new PartitionRegistration.Builder().
+                setReplicas(new int[] {3, 2, 1}).
+                setDirectories(new Uuid[]{
+                        Uuid.fromString("FbRuu7CeQtq5YFreEzg16g"),
+                        Uuid.fromString("4rtHTelWSSStAFMODOg3cQ"),
+                        Uuid.fromString("Id1WXzHURROilVxZWJNZlw")
+                }).
+                setIsr(new int[] {1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+                setLeaderEpoch(100).setPartitionEpoch(200).build();
+        assertEquals(Uuid.fromString("Id1WXzHURROilVxZWJNZlw"), partitionRegistration.directory(1));
+        assertEquals(Uuid.fromString("4rtHTelWSSStAFMODOg3cQ"), partitionRegistration.directory(2));
+        assertEquals(Uuid.fromString("FbRuu7CeQtq5YFreEzg16g"), partitionRegistration.directory(3));
+        assertThrows(IllegalArgumentException.class, () -> partitionRegistration.directory(4));
+    }
+
+    @Test
+    public void testMigratingRecordDirectories() {
+        PartitionRecord record = new PartitionRecord().
+                setTopicId(Uuid.fromString("ONlQ7DDzQtGESsG499UDQg")).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(0, 1)).
+                setIsr(Arrays.asList(0, 1)).
+                setLeader(0).
+                setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
+                setLeaderEpoch(0).
+                setPartitionEpoch(0);
+        PartitionRegistration registration = new PartitionRegistration(record);
+        assertArrayEquals(new Uuid[]{DirectoryId.MIGRATING, DirectoryId.MIGRATING}, registration.directories);
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java
index b5aacc71525..06cf5ae50d5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.metadata.placement;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
+import org.apache.kafka.common.Uuid;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -61,7 +62,13 @@ public class PartitionAssignmentTest {
     @Test
     public void testToString() {
         List<Integer> replicas = Arrays.asList(0, 1, 2);
-        PartitionAssignment partitionAssignment = new PartitionAssignment(replicas);
-        assertEquals("PartitionAssignment(replicas=[0, 1, 2])", partitionAssignment.toString());
+        List<Uuid> directories = Arrays.asList(
+                Uuid.fromString("65WMNfybQpCDVulYOxMCTw"),
+                Uuid.fromString("VkZ5AkuESPGkMc2OxpKUjw"),
+                Uuid.fromString("wFtTi4FxTlOhhHytfxv7fQ")
+        );
+        PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, directories::get);
+        assertEquals("PartitionAssignment(replicas=[0, 1, 2], " +
+                "directories=[65WMNfybQpCDVulYOxMCTw, VkZ5AkuESPGkMc2OxpKUjw, wFtTi4FxTlOhhHytfxv7fQ])", partitionAssignment.toString());
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
index ef11845ee6e..8b02416d2be 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.metadata.placement;
 
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer.BrokerList;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer.RackList;
@@ -27,6 +29,7 @@ import org.junit.jupiter.api.Timeout;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -94,7 +97,17 @@ public class StripedReplicaPlacerTest {
         PlacementSpec placementSpec = new PlacementSpec(startPartition,
             numPartitions,
             replicationFactor);
-        return placer.place(placementSpec, brokers::iterator);
+        return placer.place(placementSpec, new ClusterDescriber() {
+            @Override
+            public Iterator<UsableBroker> usableBrokers() {
+                return brokers.iterator();
+            }
+
+            @Override
+            public Uuid defaultDir(int brokerId) {
+                return DirectoryId.UNASSIGNED;
+            }
+        });
     }
 
     /**
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
index 61b2cbd6fa0..7b5a24c3b84 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.metadata.placement;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
+import org.apache.kafka.common.Uuid;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -74,10 +75,16 @@ public class TopicAssignmentTest {
     @Test
     public void testToString() {
         List<Integer> replicas = Arrays.asList(0, 1, 2);
+        List<Uuid> directories = Arrays.asList(
+                Uuid.fromString("v56qeYzNRrqNtXsxzcReog"),
+                Uuid.fromString("MvUIAsOiRlSePeiBHdZrSQ"),
+                Uuid.fromString("jUqCchHtTHqMxeVv4dw1RA")
+        );
         List<PartitionAssignment> partitionAssignments = Arrays.asList(
-            new PartitionAssignment(replicas)
+            new PartitionAssignment(replicas, directories::get)
         );
         TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
-        assertEquals("TopicAssignment(assignments=[PartitionAssignment(replicas=[0, 1, 2])])", topicAssignment.toString());
+        assertEquals("TopicAssignment(assignments=[PartitionAssignment(replicas=[0, 1, 2], " +
+                "directories=[v56qeYzNRrqNtXsxzcReog, MvUIAsOiRlSePeiBHdZrSQ, jUqCchHtTHqMxeVv4dw1RA])])", topicAssignment.toString());
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java b/metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java
similarity index 59%
copy from metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
copy to metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java
index 8aaa092205e..a01e47dbc75 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.metadata.placement;
+package org.apache.kafka.metadata.util;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.mockito.internal.util.MockUtil;
 
-import java.util.Iterator;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
-
-/**
- * Can describe a cluster to a ReplicaPlacer.
- */
-@InterfaceStability.Unstable
-public interface ClusterDescriber {
-    /**
-     * Get an iterator through the usable brokers.
-     */
-    Iterator<UsableBroker> usableBrokers();
+public class MetadataFeatureUtil {
+    public static MetadataVersion withDirectoryAssignmentSupport(MetadataVersion metadataVersion) {
+        MetadataVersion spy = MockUtil.isMock(metadataVersion) ? metadataVersion : spy(metadataVersion);
+        when(spy.isDirectoryAssignmentSupported()).thenReturn(true);
+        return spy;
+    }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
index 48008a81e4f..4233198be05 100644
--- a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
+++ b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -120,8 +121,38 @@ public class DirectoryId {
      * Create an array with the specified number of entries set to {@link #UNASSIGNED}.
      */
     public static Uuid[] unassignedArray(int length) {
+        return array(length, UNASSIGNED);
+    }
+
+    /**
+     * Create an array with the specified number of entries set to {@link #MIGRATING}.
+     */
+    public static Uuid[] migratingArray(int length) {
+        return array(length, MIGRATING);
+    }
+
+    /**
+     * Create an array with the specified number of entries set to the specified value.
+     */
+    private static Uuid[] array(int length, Uuid value) {
         Uuid[] array = new Uuid[length];
-        Arrays.fill(array, UNASSIGNED);
+        Arrays.fill(array, value);
         return array;
     }
+
+    /**
+     * Check if a directory is online, given a sorted list of online directories.
+     * @param dir              The directory to check
+     * @param sortedOnlineDirs The sorted list of online directories
+     * @return                 true if the directory is considered online, false otherwise
+     */
+    public static boolean isOnline(Uuid dir, List<Uuid> sortedOnlineDirs) {
+        if (UNASSIGNED.equals(dir) || MIGRATING.equals(dir)) {
+            return true;
+        }
+        if (LOST.equals(dir)) {
+            return false;
+        }
+        return Collections.binarySearch(sortedOnlineDirs, dir) >= 0;
+    }
 }
diff --git a/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java b/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java
index 7a984c7cf8a..5b4d427f275 100644
--- a/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java
+++ b/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java
@@ -20,8 +20,10 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -41,50 +43,10 @@ public class DirectoryIdTest {
         assertTrue(DirectoryId.reserved(DirectoryId.MIGRATING));
     }
 
-    @Test
-    void testCreateDirectoriesFrom() {
-        assertThrows(IllegalArgumentException.class, () -> DirectoryId.createDirectoriesFrom(
-                new int[] {1},
-                new Uuid[] {DirectoryId.UNASSIGNED, DirectoryId.LOST},
-                Arrays.asList(2, 3)
-        ));
-        assertEquals(
-            Arrays.asList(
-                Uuid.fromString("YXY0bQYEQmmyOQ6ZDfGgSQ"),
-                Uuid.fromString("5SZij3DRQgaFbvzR9KooLg"),
-                DirectoryId.UNASSIGNED
-            ),
-            DirectoryId.createDirectoriesFrom(
-                new int[] {1, 2, 3},
-                new Uuid[] {
-                        Uuid.fromString("MgVK5KSwTxe65eYATaoQrg"),
-                        Uuid.fromString("YXY0bQYEQmmyOQ6ZDfGgSQ"),
-                        Uuid.fromString("5SZij3DRQgaFbvzR9KooLg")
-                },
-                Arrays.asList(2, 3, 4)
-            )
-        );
-        assertEquals(
-                Arrays.asList(
-                        DirectoryId.UNASSIGNED,
-                        DirectoryId.UNASSIGNED,
-                        DirectoryId.UNASSIGNED
-                ),
-                DirectoryId.createDirectoriesFrom(
-                        new int[] {1, 2},
-                        new Uuid[] {
-                            DirectoryId.UNASSIGNED,
-                            DirectoryId.UNASSIGNED
-                        },
-                        Arrays.asList(1, 2, 3)
-                )
-        );
-    }
-
     @Test
     void testCreateAssignmentMap() {
-        assertThrows(IllegalArgumentException.class,
-                () -> DirectoryId.createAssignmentMap(new int[]{1, 2}, DirectoryId.unassignedArray(3)));
+        assertThrows(IllegalArgumentException.class, () ->
+                DirectoryId.createAssignmentMap(new int[]{1, 2}, DirectoryId.unassignedArray(3)));
         assertEquals(
             new HashMap<Integer, Uuid>() {{
                     put(1, Uuid.fromString("upjfkCrUR9GNn1i94ip1wg"));
@@ -102,4 +64,21 @@ public class DirectoryIdTest {
                     })
         );
     }
+
+    @Test
+    void testIsOnline() {
+        List<Uuid> sortedDirs = Arrays.asList(
+                Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"),
+                Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"),
+                Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw")
+        );
+        sortedDirs.sort(Uuid::compareTo);
+        assertTrue(DirectoryId.isOnline(Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"), sortedDirs));
+        assertTrue(DirectoryId.isOnline(Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"), sortedDirs));
+        assertTrue(DirectoryId.isOnline(Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw"), sortedDirs));
+        assertTrue(DirectoryId.isOnline(DirectoryId.MIGRATING, sortedDirs));
+        assertTrue(DirectoryId.isOnline(DirectoryId.UNASSIGNED, sortedDirs));
+        assertFalse(DirectoryId.isOnline(DirectoryId.LOST, sortedDirs));
+        assertFalse(DirectoryId.isOnline(Uuid.fromString("AMYchbMtS6yhtsXbca7DQg"), sortedDirs));
+    }
 }