You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/11/30 22:11:04 UTC
(kafka) branch trunk updated: KAFKA-15886: Always specify directories for new partition registrations
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b87c852911 KAFKA-15886: Always specify directories for new partition registrations
6b87c852911 is described below
commit 6b87c8529113939f3a3709603b6f37d733daf49a
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Wed Nov 22 16:44:35 2023 +0000
KAFKA-15886: Always specify directories for new partition registrations
When creating partition registrations directories must always be defined.
If creating a partition from a PartitionRecord or PartitionChangeRecord from an older version that
does not support directory assignments, then DirectoryId.MIGRATING is assumed.
If creating a new partition, or triggering a change in assignment, DirectoryId.UNASSIGNED should be
specified, unless the target broker has a single online directory registered, in which case the
replica should be assigned directly to that single directory.
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../kafka/server/metadata/KRaftMetadataCache.scala | 17 +--
.../server/ReplicaManagerConcurrencyTest.scala | 3 +-
.../metadata/BrokerMetadataPublisherTest.scala | 3 +-
.../kafka/zk/migration/ZkMigrationClientTest.scala | 112 +++++++++++++++--
.../kafka/controller/PartitionChangeBuilder.java | 33 ++++-
.../controller/ReplicationControlManager.java | 134 +++++++++++++--------
.../apache/kafka/metadata/BrokerRegistration.java | 3 +-
.../kafka/metadata/PartitionRegistration.java | 19 ++-
.../kafka/metadata/placement/ClusterDescriber.java | 8 +-
...usterDescriber.java => DefaultDirProvider.java} | 20 ++-
.../metadata/placement/PartitionAssignment.java | 40 ++++--
.../metadata/placement/StripedReplicaPlacer.java | 2 +-
.../controller/ClusterControlManagerTest.java | 22 +++-
.../controller/PartitionChangeBuilderTest.java | 128 +++++++++++++++++---
.../PartitionReassignmentReplicasTest.java | 23 ++++
.../PartitionReassignmentRevertTest.java | 32 +++++
.../controller/ReplicationControlManagerTest.java | 68 +++++++++--
.../ControllerMetadataMetricsPublisherTest.java | 3 +-
.../metrics/ControllerMetricsChangesTest.java | 3 +-
.../metrics/ControllerMetricsTestUtils.java | 2 +
.../org/apache/kafka/image/ImageDowngradeTest.java | 6 +-
.../org/apache/kafka/image/MetadataImageTest.java | 3 +-
.../org/apache/kafka/image/TopicsImageTest.java | 18 ++-
.../kafka/metadata/BrokerRegistrationTest.java | 7 +-
.../kafka/metadata/PartitionRegistrationTest.java | 81 +++++++++++--
.../placement/PartitionAssignmentTest.java | 11 +-
.../placement/StripedReplicaPlacerTest.java | 15 ++-
.../metadata/placement/TopicAssignmentTest.java | 11 +-
.../kafka/metadata/util/MetadataFeatureUtil.java} | 24 ++--
.../java/org/apache/kafka/common/DirectoryId.java | 33 ++++-
.../org/apache/kafka/common/DirectoryIdTest.java | 63 ++++------
31 files changed, 728 insertions(+), 219 deletions(-)
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 9a29fb77889..485bba8812c 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -23,7 +23,7 @@ import kafka.utils.Logging
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
-import org.apache.kafka.common.{Cluster, DirectoryId, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
@@ -155,18 +155,11 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
offlineReplicas
}
- private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) = {
- broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isInOfflineDir(broker, partition)
- }
+ private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
+ broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)
- private def isInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean = {
- partition.directory(broker.id()) match {
- case DirectoryId.LOST => true
- case DirectoryId.MIGRATING => false
- case DirectoryId.UNASSIGNED => false
- case dir => !broker.hasOnlineDir(dir)
- }
- }
+ private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
+ !broker.hasOnlineDir(partition.directory(broker.id()))
/**
* Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 9e7f1022e50..9813c0b73d9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
@@ -474,6 +474,7 @@ class ReplicaManagerConcurrencyTest {
): PartitionRegistration = {
new PartitionRegistration.Builder().
setReplicas(replicaIds.toArray).
+ setDirectories(DirectoryId.unassignedArray(replicaIds.size)).
setIsr(isr.toArray).
setLeader(leader).
setLeaderRecoveryState(leaderRecoveryState).
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 136fb87e894..f80e2601979 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTop
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
import org.apache.kafka.common.utils.Exit
-import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, TopicImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
@@ -162,6 +162,7 @@ class BrokerMetadataPublisherTest {
val partitionRegistrations = partitions.map { case (partitionId, replicas) =>
Int.box(partitionId) -> new PartitionRegistration.Builder().
setReplicas(replicas.toArray).
+ setDirectories(DirectoryId.unassignedArray(replicas.size)).
setIsr(replicas.toArray).
setLeader(replicas.head).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index 773c42a66e4..93b29c701c4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -23,7 +23,7 @@ import kafka.server.{ConfigType, KafkaConfig}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, PartitionRecord, ProducerIdsRecord, TopicRecord}
-import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.migration.{KRaftMigrationZkWriter, ZkMigrationLeadershipState}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
@@ -79,8 +79,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
assertEquals(0, migrationState.migrationZkVersion())
val partitions = Map(
- 0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(1, 2)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(6).setPartitionEpoch(-1).build(),
- 1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(3)).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(7).setPartitionEpoch(-1).build()
+ 0 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(0, 1, 2))
+ .setDirectories(DirectoryId.migratingArray(3))
+ .setIsr(Array(1, 2))
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(6)
+ .setPartitionEpoch(-1)
+ .build(),
+ 1 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(1, 2, 3))
+ .setDirectories(DirectoryId.migratingArray(3))
+ .setIsr(Array(3))
+ .setLeader(3)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(7)
+ .setPartitionEpoch(-1)
+ .build()
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
migrationState = migrationClient.topicClient().updateTopicPartitions(Map("test" -> partitions).asJava, migrationState)
assertEquals(1, migrationState.migrationZkVersion())
@@ -106,8 +122,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
assertEquals(0, migrationState.migrationZkVersion())
val partitions = Map(
- 0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
- 1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+ 0 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(0, 1, 2))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(0, 1, 2))
+ .setLeader(0)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build(),
+ 1 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(1, 2, 3))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(1, 2, 3))
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build()
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
migrationState = migrationClient.topicClient().createTopic("test", Uuid.randomUuid(), partitions, migrationState)
assertEquals(1, migrationState.migrationZkVersion())
@@ -129,8 +161,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
assertEquals(0, migrationState.migrationZkVersion())
val partitions = Map(
- 0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
- 1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+ 0 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(0, 1, 2))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(0, 1, 2))
+ .setLeader(0)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build(),
+ 1 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(1, 2, 3))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(1, 2, 3))
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build()
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
val topicId = Uuid.randomUuid()
migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
@@ -375,8 +423,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
val topicId = Uuid.randomUuid()
val partitions = Map(
- 0 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
- 1 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+ 0 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(0, 1, 2))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(0, 1, 2))
+ .setLeader(0)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build(),
+ 1 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(1, 2, 3))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(1, 2, 3))
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build()
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
assertEquals(1, migrationState.migrationZkVersion())
@@ -384,8 +448,24 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
// Change assignment in partitions and update the topic assignment. See the change is
// reflected.
val changedPartitions = Map(
- 0 -> new PartitionRegistration.Builder().setReplicas(Array(1, 2, 3)).setIsr(Array(1, 2, 3)).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build(),
- 1 -> new PartitionRegistration.Builder().setReplicas(Array(0, 1, 2)).setIsr(Array(0, 1, 2)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+ 0 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(1, 2, 3))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(1, 2, 3))
+ .setLeader(0)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build(),
+ 1 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(0, 1, 2))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(0, 1, 2))
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build()
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
migrationState = migrationClient.topicClient().updateTopic("test", topicId, changedPartitions, migrationState)
assertEquals(2, migrationState.migrationZkVersion())
@@ -406,7 +486,15 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
// Add a new Partition.
val newPartition = Map(
- 2 -> new PartitionRegistration.Builder().setReplicas(Array(2, 3, 4)).setIsr(Array(2, 3, 4)).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(-1).build()
+ 2 -> new PartitionRegistration.Builder()
+ .setReplicas(Array(2, 3, 4))
+ .setDirectories(DirectoryId.unassignedArray(3))
+ .setIsr(Array(2, 3, 4))
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(-1)
+ .build()
).map { case (k, v) => int2Integer(k) -> v }.asJava
migrationState = migrationClient.topicClient().createTopicPartitions(Map("test" -> newPartition).asJava, migrationState)
assertEquals(3, migrationState.migrationZkVersion())
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index a72528e7e1b..0ccc09bcdfb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -17,10 +17,12 @@
package org.apache.kafka.controller;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.IntPredicate;
@@ -33,6 +35,7 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.metadata.placement.DefaultDirProvider;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
@@ -94,6 +97,8 @@ public class PartitionChangeBuilder {
private boolean zkMigrationEnabled;
private boolean eligibleLeaderReplicasEnabled;
private int minISR;
+ private Map<Integer, Uuid> targetDirectories;
+ private DefaultDirProvider defaultDirProvider;
// Whether allow electing last known leader in a Balanced recovery. Note, the last known leader will be stored in the
// lastKnownElr field if enabled.
@@ -123,6 +128,10 @@ public class PartitionChangeBuilder {
this.targetElr = Replicas.toList(partition.elr);
this.targetLastKnownElr = Replicas.toList(partition.lastKnownElr);
this.targetLeaderRecoveryState = partition.leaderRecoveryState;
+ this.targetDirectories = DirectoryId.createAssignmentMap(partition.replicas, partition.directories);
+ this.defaultDirProvider = uuid -> {
+ throw new IllegalStateException("DefaultDirProvider is not set");
+ };
}
public PartitionChangeBuilder setTargetIsr(List<Integer> targetIsr) {
@@ -184,6 +193,16 @@ public class PartitionChangeBuilder {
return this;
}
+ public PartitionChangeBuilder setDirectory(int brokerId, Uuid dir) {
+ this.targetDirectories.put(brokerId, dir);
+ return this;
+ }
+
+ public PartitionChangeBuilder setDefaultDirProvider(DefaultDirProvider defaultDirProvider) {
+ this.defaultDirProvider = defaultDirProvider;
+ return this;
+ }
+
// VisibleForTesting
static class ElectionResult {
final int node;
@@ -446,11 +465,19 @@ public class PartitionChangeBuilder {
}
private void setAssignmentChanges(PartitionChangeRecord record) {
- if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
+ if (!targetReplicas.isEmpty()) {
if (metadataVersion.isDirectoryAssignmentSupported()) {
- record.setDirectories(DirectoryId.createDirectoriesFrom(partition.replicas, partition.directories, targetReplicas));
+ List<Uuid> directories = new ArrayList<>(targetReplicas.size());
+ for (int replica : targetReplicas) {
+ directories.add(this.targetDirectories.getOrDefault(replica, defaultDirProvider.defaultDir(replica)));
+ }
+ if (!directories.equals(Arrays.asList(partition.directories))) {
+ record.setDirectories(directories);
+ }
+ }
+ if (!targetReplicas.equals(Replicas.toList(partition.replicas))) {
+ record.setReplicas(targetReplicas);
}
- record.setReplicas(targetReplicas);
}
if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {
record.setRemovingReplicas(targetRemoving);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index da048b210f4..f087027a749 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
@@ -46,7 +47,6 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
-import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
@@ -246,6 +246,12 @@ public class ReplicationControlManager {
public Iterator<UsableBroker> usableBrokers() {
return clusterControl.usableBrokers();
}
+
+ @Override
+ public Uuid defaultDir(int brokerId) {
+ // TODO KAFKA-15361 & KAFKA-15364 determine default dir based on broker registration and heartbeat
+ return DirectoryId.MIGRATING;
+ }
}
static class TopicControlInfo {
@@ -675,9 +681,8 @@ public class ReplicationControlManager {
"Found multiple manual partition assignments for partition " +
assignment.partitionIndex());
}
- validateManualPartitionAssignment(
- new PartitionAssignment(assignment.brokerIds()),
- replicationFactor);
+ PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber);
+ validateManualPartitionAssignment(partitionAssignment, replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
List<Integer> isr = assignment.brokerIds().stream().
filter(clusterControl::isActive).collect(Collectors.toList());
@@ -688,7 +693,7 @@ public class ReplicationControlManager {
}
newParts.put(
assignment.partitionIndex(),
- buildPartitionRegistration(assignment.brokerIds(), isr)
+ buildPartitionRegistration(partitionAssignment, isr)
);
}
for (int i = 0; i < newParts.size(); i++) {
@@ -724,8 +729,7 @@ public class ReplicationControlManager {
), clusterDescriber);
for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) {
PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
- List<Integer> replicas = partitionAssignment.replicas();
- List<Integer> isr = replicas.stream().
+ List<Integer> isr = partitionAssignment.replicas().stream().
filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica placer,
@@ -737,7 +741,7 @@ public class ReplicationControlManager {
}
newParts.put(
partitionId,
- buildPartitionRegistration(replicas, isr)
+ buildPartitionRegistration(partitionAssignment, isr)
);
}
} catch (InvalidReplicationFactorException e) {
@@ -800,11 +804,12 @@ public class ReplicationControlManager {
}
private static PartitionRegistration buildPartitionRegistration(
- List<Integer> replicas,
+ PartitionAssignment partitionAssignment,
List<Integer> isr
) {
return new PartitionRegistration.Builder().
- setReplicas(Replicas.toArray(replicas)).
+ setReplicas(Replicas.toArray(partitionAssignment.replicas())).
+ setDirectories(Uuid.toArray(partitionAssignment.directories())).
setIsr(Replicas.toArray(isr)).
setLeader(isr.get(0)).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1045,19 +1050,20 @@ public class ReplicationControlManager {
partition,
topic.id,
partitionId,
- clusterControl::isActive,
+ new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
- );
- builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
- builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+ )
+ .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+ .setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
- builder.setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs());
- builder.setTargetLeaderRecoveryState(
- LeaderRecoveryState.of(partitionData.leaderRecoveryState()));
- Optional<ApiMessageAndVersion> record = builder.build();
+ Optional<ApiMessageAndVersion> record = builder
+ .setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs())
+ .setTargetLeaderRecoveryState(LeaderRecoveryState.of(partitionData.leaderRecoveryState()))
+ .setDefaultDirProvider(clusterDescriber)
+ .build();
if (record.isPresent()) {
records.add(record.get());
PartitionChangeRecord change = (PartitionChangeRecord) record.get().message();
@@ -1431,18 +1437,19 @@ public class ReplicationControlManager {
if (electionType == ElectionType.UNCLEAN) {
election = PartitionChangeBuilder.Election.UNCLEAN;
}
- PartitionChangeBuilder builder = new PartitionChangeBuilder(
+ Optional<ApiMessageAndVersion> record = new PartitionChangeBuilder(
partition,
topicId,
partitionId,
- clusterControl::isActive,
+ new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic)
- );
- builder.setElection(election)
- .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
- builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
- Optional<ApiMessageAndVersion> record = builder.build();
+ )
+ .setElection(election)
+ .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+ .setEligibleLeaderReplicasEnabled(isElrEnabled())
+ .setDefaultDirProvider(clusterDescriber)
+ .build();
if (!record.isPresent()) {
if (electionType == ElectionType.PREFERRED) {
return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
@@ -1569,18 +1576,19 @@ public class ReplicationControlManager {
}
// Attempt to perform a preferred leader election
- PartitionChangeBuilder builder = new PartitionChangeBuilder(
+ new PartitionChangeBuilder(
partition,
topicPartition.topicId(),
topicPartition.partitionId(),
- clusterControl::isActive,
+ new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
- );
- builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
- .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
- builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
- builder.build().ifPresent(records::add);
+ )
+ .setElection(PartitionChangeBuilder.Election.PREFERRED)
+ .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+ .setEligibleLeaderReplicasEnabled(isElrEnabled())
+ .setDefaultDirProvider(clusterDescriber)
+ .build().ifPresent(records::add);
}
return ControllerResult.of(records, rescheduleImmediately);
@@ -1665,11 +1673,11 @@ public class ReplicationControlManager {
partitionAssignments = new ArrayList<>();
isrs = new ArrayList<>();
for (int i = 0; i < topic.assignments().size(); i++) {
- CreatePartitionsAssignment assignment = topic.assignments().get(i);
- validateManualPartitionAssignment(new PartitionAssignment(assignment.brokerIds()),
- OptionalInt.of(replicationFactor));
- partitionAssignments.add(new PartitionAssignment(assignment.brokerIds()));
- List<Integer> isr = assignment.brokerIds().stream().
+ List<Integer> replicas = topic.assignments().get(i).brokerIds();
+ PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, clusterDescriber);
+ validateManualPartitionAssignment(partitionAssignment, OptionalInt.of(replicationFactor));
+ partitionAssignments.add(partitionAssignment);
+ List<Integer> isr = partitionAssignment.replicas().stream().
filter(clusterControl::isActive).collect(Collectors.toList());
if (isr.isEmpty()) {
throw new InvalidReplicaAssignmentException(
@@ -1688,7 +1696,6 @@ public class ReplicationControlManager {
int partitionId = startPartitionId;
for (int i = 0; i < partitionAssignments.size(); i++) {
PartitionAssignment partitionAssignment = partitionAssignments.get(i);
- List<Integer> replicas = partitionAssignment.replicas();
List<Integer> isr = isrs.get(i).stream().
filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are fenced or
@@ -1699,7 +1706,7 @@ public class ReplicationControlManager {
"Unable to replicate the partition " + replicationFactor +
" time(s): All brokers are currently fenced or in controlled shutdown.");
}
- records.add(buildPartitionRegistration(replicas, isr)
+ records.add(buildPartitionRegistration(partitionAssignment, isr)
.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
build()));
@@ -1790,7 +1797,7 @@ public class ReplicationControlManager {
partition,
topicIdPart.topicId(),
topicIdPart.partitionId(),
- isAcceptableLeader,
+ new LeaderAcceptor(clusterControl, partition, isAcceptableLeader),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
);
@@ -1805,7 +1812,8 @@ public class ReplicationControlManager {
builder.setTargetIsr(Replicas.toList(
Replicas.copyWithout(partition.isr, brokerToRemove)));
- builder.build().ifPresent(records::add);
+ builder.setDefaultDirProvider(clusterDescriber)
+ .build().ifPresent(records::add);
}
if (records.size() != oldSize) {
if (log.isDebugEnabled()) {
@@ -1905,7 +1913,7 @@ public class ReplicationControlManager {
part,
tp.topicId(),
tp.partitionId(),
- clusterControl::isActive,
+ new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topicName)
);
@@ -1914,11 +1922,13 @@ public class ReplicationControlManager {
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
- builder.setTargetIsr(revert.isr()).
+ return builder
+ .setTargetIsr(revert.isr()).
setTargetReplicas(revert.replicas()).
setTargetRemoving(Collections.emptyList()).
- setTargetAdding(Collections.emptyList());
- return builder.build();
+ setTargetAdding(Collections.emptyList()).
+ setDefaultDirProvider(clusterDescriber).
+ build();
}
/**
@@ -1953,8 +1963,8 @@ public class ReplicationControlManager {
PartitionRegistration part,
ReassignablePartition target) {
// Check that the requested partition assignment is valid.
- PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas));
- PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas());
+ PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), part::directory);
+ PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), clusterDescriber);
validateManualPartitionAssignment(targetAssignment, OptionalInt.empty());
@@ -1965,7 +1975,7 @@ public class ReplicationControlManager {
part,
tp.topicId(),
tp.partitionId(),
- clusterControl::isActive,
+ new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
);
@@ -1980,7 +1990,7 @@ public class ReplicationControlManager {
if (!reassignment.adding().isEmpty()) {
builder.setTargetAdding(reassignment.adding());
}
- return builder.build();
+ return builder.setDefaultDirProvider(clusterDescriber).build();
}
ListPartitionReassignmentsResponseData listPartitionReassignments(
@@ -2072,4 +2082,30 @@ public class ReplicationControlManager {
return replicaId + " (" + reason + ")";
}
}
+
+ private static final class LeaderAcceptor implements IntPredicate {
+ private final ClusterControlManager clusterControl;
+ private final PartitionRegistration partition;
+ private final IntPredicate isAcceptableLeader;
+
+ private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition) {
+ this(clusterControl, partition, clusterControl::isActive);
+ }
+
+ private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition, IntPredicate isAcceptableLeader) {
+ this.clusterControl = clusterControl;
+ this.partition = partition;
+ this.isAcceptableLeader = isAcceptableLeader;
+ }
+
+ @Override
+ public boolean test(int brokerId) {
+ if (!isAcceptableLeader.test(brokerId)) {
+ return false;
+ }
+ Uuid replicaDirectory = partition.directory(brokerId);
+ BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
+ return brokerRegistration.hasOnlineDir(replicaDirectory);
+ }
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 04536b69f5f..ffe95d0fd4b 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -17,6 +17,7 @@
package org.apache.kafka.metadata;
+import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
@@ -264,7 +265,7 @@ public class BrokerRegistration {
}
public boolean hasOnlineDir(Uuid dir) {
- return Collections.binarySearch(directories, dir) >= 0;
+ return DirectoryId.isOnline(dir, directories);
}
public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 4bbc63d8cdd..076c8c3aa2d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -112,7 +112,9 @@ public class PartitionRegistration {
public PartitionRegistration build() {
if (replicas == null) {
throw new IllegalStateException("You must set replicas.");
- } else if (directories != null && directories.length != replicas.length) {
+ } else if (directories == null) {
+ throw new IllegalStateException("You must set directories.");
+ } else if (directories.length != replicas.length) {
throw new IllegalStateException("The lengths for replicas and directories do not match.");
} else if (isr == null) {
throw new IllegalStateException("You must set isr.");
@@ -180,9 +182,16 @@ public class PartitionRegistration {
return record.directories();
}
+ private static Uuid[] defaultToMigrating(Uuid[] directories, int numReplicas) {
+ if (directories == null || directories.length == 0) {
+ return DirectoryId.migratingArray(numReplicas);
+ }
+ return directories;
+ }
+
public PartitionRegistration(PartitionRecord record) {
this(Replicas.toArray(record.replicas()),
- Uuid.toArray(checkDirectories(record)),
+ defaultToMigrating(Uuid.toArray(checkDirectories(record)), record.replicas().size()),
Replicas.toArray(record.isr()),
Replicas.toArray(record.removingReplicas()),
Replicas.toArray(record.addingReplicas()),
@@ -201,7 +210,7 @@ public class PartitionRegistration {
throw new IllegalArgumentException("The lengths for replicas and directories do not match.");
}
this.replicas = replicas;
- this.directories = directories != null && directories.length > 0 ? directories : DirectoryId.unassignedArray(replicas.length);
+ this.directories = Objects.requireNonNull(directories);
this.isr = isr;
this.removingReplicas = removingReplicas;
this.addingReplicas = addingReplicas;
@@ -247,7 +256,7 @@ public class PartitionRegistration {
int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
int[] newLastKnownElr = (record.lastKnownELR() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownELR());
return new PartitionRegistration(newReplicas,
- newDirectories,
+ defaultToMigrating(newDirectories, replicas.length),
newIsr,
newRemovingReplicas,
newAddingReplicas,
@@ -377,7 +386,7 @@ public class PartitionRegistration {
record.setDirectories(Uuid.toList(directories));
} else {
for (Uuid directory : directories) {
- if (!DirectoryId.UNASSIGNED.equals(directory)) {
+ if (!DirectoryId.MIGRATING.equals(directory)) {
options.handleLoss("the directory assignment state of one or more replicas");
break;
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
index 8aaa092205e..ab0ea76cc1e 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
@@ -17,6 +17,7 @@
package org.apache.kafka.metadata.placement;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Iterator;
@@ -26,9 +27,14 @@ import java.util.Iterator;
* Can describe a cluster to a ReplicaPlacer.
*/
@InterfaceStability.Unstable
-public interface ClusterDescriber {
+public interface ClusterDescriber extends DefaultDirProvider {
/**
* Get an iterator through the usable brokers.
*/
Iterator<UsableBroker> usableBrokers();
+
+ /**
+ * Get the default directory for new partitions placed in a given broker.
+ */
+ Uuid defaultDir(int brokerId);
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java
similarity index 63%
copy from metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
copy to metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java
index 8aaa092205e..79a5348bbf2 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java
@@ -17,18 +17,16 @@
package org.apache.kafka.metadata.placement;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Iterator;
-
+import org.apache.kafka.common.Uuid;
/**
- * Can describe a cluster to a ReplicaPlacer.
+ * Provide the default directory for new partitions in a given broker.
+ * For brokers that are registered with multiple directories, the return value
+ * should always be {@link org.apache.kafka.common.DirectoryId#UNASSIGNED}.
+ * For brokers that are registered with a single log directory, then the return
+ * value should be the ID for that directory.
*/
-@InterfaceStability.Unstable
-public interface ClusterDescriber {
- /**
- * Get an iterator through the usable brokers.
- */
- Iterator<UsableBroker> usableBrokers();
+@FunctionalInterface
+public interface DefaultDirProvider {
+ Uuid defaultDir(int brokerId);
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
index 92373df1fe4..177d5311afd 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java
@@ -17,7 +17,11 @@
package org.apache.kafka.metadata.placement;
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Uuid;
+
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -25,14 +29,28 @@ import java.util.Objects;
/**
* The partition assignment.
*
- * The assignment is represented as a list of integers where each integer is the replica ID. This class is immutable.
- * It's internal state does not change.
+ * The assignment is represented as a list of integers and {@link Uuid}s
+ * where each integer is the replica ID, and each Uuid is the ID of the
+ * directory hosting the replica in the broker.
+ * This class is immutable. It's internal state does not change.
*/
public class PartitionAssignment {
+
private final List<Integer> replicas;
+ private final List<Uuid> directories;
+ // TODO remove -- just here for testing
public PartitionAssignment(List<Integer> replicas) {
+ this(replicas, brokerId -> DirectoryId.UNASSIGNED);
+ }
+
+ public PartitionAssignment(List<Integer> replicas, DefaultDirProvider defaultDirProvider) {
this.replicas = Collections.unmodifiableList(new ArrayList<>(replicas));
+ Uuid[] directories = new Uuid[replicas.size()];
+ for (int i = 0; i < directories.length; i++) {
+ directories[i] = defaultDirProvider.defaultDir(replicas.get(i));
+ }
+ this.directories = Collections.unmodifiableList(Arrays.asList(directories));
}
/**
@@ -42,22 +60,28 @@ public class PartitionAssignment {
return replicas;
}
+ public List<Uuid> directories() {
+ return directories;
+ }
+
@Override
public boolean equals(Object o) {
- if (!(o instanceof PartitionAssignment)) return false;
- PartitionAssignment other = (PartitionAssignment) o;
- return replicas.equals(other.replicas);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PartitionAssignment that = (PartitionAssignment) o;
+ return Objects.equals(replicas, that.replicas) && Objects.equals(directories, that.directories);
}
@Override
public int hashCode() {
- return Objects.hash(replicas);
+ return Objects.hash(replicas, directories);
}
@Override
public String toString() {
return "PartitionAssignment" +
- "(replicas=" + replicas +
- ")";
+ "(replicas=" + replicas +
+ ", directories=" + directories +
+ ")";
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index f13e8ce9897..3f587a3d47e 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -438,7 +438,7 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
placements.add(rackList.place(placement.numReplicas()));
}
return new TopicAssignment(
- placements.stream().map(PartitionAssignment::new).collect(Collectors.toList())
+ placements.stream().map(replicas -> new PartitionAssignment(replicas, cluster)).collect(Collectors.toList())
);
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 91bab7b0109..7865ced90cd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.controller;
+import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
@@ -41,8 +42,10 @@ import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.metadata.placement.PlacementSpec;
+import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -55,6 +58,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -402,10 +406,20 @@ public class ClusterControlManagerTest {
}
for (int i = 0; i < 100; i++) {
List<PartitionAssignment> results = clusterControl.replicaPlacer().place(
- new PlacementSpec(0,
- 1,
- (short) 3),
- clusterControl::usableBrokers
+ new PlacementSpec(0,
+ 1,
+ (short) 3),
+ new ClusterDescriber() {
+ @Override
+ public Iterator<UsableBroker> usableBrokers() {
+ return clusterControl.usableBrokers();
+ }
+
+ @Override
+ public Uuid defaultDir(int brokerId) {
+ return DirectoryId.UNASSIGNED;
+ }
+ }
).assignments();
HashSet<Integer> seen = new HashSet<>();
for (Integer result : results.get(0).replicas()) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index 8b346b1c612..2e7b91be396 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.controller.PartitionChangeBuilder.ElectionResult;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.metadata.placement.DefaultDirProvider;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -48,6 +49,7 @@ import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -56,6 +58,8 @@ import static org.junit.jupiter.params.provider.Arguments.arguments;
@Timeout(value = 40)
public class PartitionChangeBuilderTest {
+ private static final DefaultDirProvider DEFAULT_DIR_PROVIDER = brokerId -> DirectoryId.UNASSIGNED;
+
private static Stream<Arguments> partitionChangeRecordVersions() {
return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version));
}
@@ -127,13 +131,24 @@ public class PartitionChangeBuilderTest {
}
private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) {
- return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersion, 2);
+ return new PartitionChangeBuilder(FOO,
+ FOO_ID,
+ 0,
+ r -> r != 3,
+ metadataVersion,
+ 2)
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
private static PartitionChangeBuilder createFooBuilder(short version) {
- return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version), 2).
- setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+ return new PartitionChangeBuilder(FOO,
+ FOO_ID,
+ 0,
+ r -> r != 3,
+ metadataVersionForPartitionChangeRecordVersion(version),
+ 2).
+ setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
@@ -160,9 +175,14 @@ public class PartitionChangeBuilderTest {
}
private static PartitionChangeBuilder createBarBuilder(short version) {
- return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version), 2).
- setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+ return new PartitionChangeBuilder(BAR,
+ BAR_ID,
+ 0,
+ r -> r != 3,
+ metadataVersionForPartitionChangeRecordVersion(version),
+ 2).
+ setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
private static final PartitionRegistration BAZ = new PartitionRegistration.Builder().
@@ -182,9 +202,14 @@ public class PartitionChangeBuilderTest {
private final static Uuid BAZ_ID = Uuid.fromString("wQzt5gkSTwuQNXZF5gIw7A");
private static PartitionChangeBuilder createBazBuilder(short version) {
- return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true,
- metadataVersionForPartitionChangeRecordVersion(version), 2).
- setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+ return new PartitionChangeBuilder(BAZ,
+ BAZ_ID,
+ 0,
+ __ -> true,
+ metadataVersionForPartitionChangeRecordVersion(version),
+ 2).
+ setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
private static final PartitionRegistration OFFLINE_WITHOUT_ELR = new PartitionRegistration.Builder().
@@ -203,6 +228,11 @@ public class PartitionChangeBuilderTest {
private static final PartitionRegistration OFFLINE_WITH_ELR = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("CQEqt7trRrmqyNxUT1CY0g"),
+ Uuid.fromString("59Mb9smoSsC0bGUP2FYV8A"),
+ Uuid.fromString("LBTmsCVJREqJuIEtwqxRDg")
+ }).
setElr(new int[] {3}).
setIsr(new int[] {}).
setLastKnownElr(new int[] {2}).
@@ -219,10 +249,14 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion);
if (metadataVersion.isElrSupported()) {
return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, r -> r == 1,
- metadataVersion, 2).setEligibleLeaderReplicasEnabled(true);
+ metadataVersion, 2).
+ setEligibleLeaderReplicasEnabled(true).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
} else {
return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 0, r -> r == 1,
- metadataVersion, 2).setEligibleLeaderReplicasEnabled(false);
+ metadataVersion, 2).
+ setEligibleLeaderReplicasEnabled(false).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
}
@@ -792,6 +826,7 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
// Update ISR to {1, 2}
@@ -843,6 +878,7 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3)));
@@ -888,6 +924,7 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 4)));
@@ -939,6 +976,7 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
builder.setUncleanShutdownReplicas(Arrays.asList(3));
@@ -967,7 +1005,7 @@ public class PartitionChangeBuilderTest {
}
@Test
- void testKeepsDirectoriesAfterReassignment() {
+ public void testKeepsDirectoriesAfterReassignment() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[]{2, 1, 3}).
setDirectories(new Uuid[]{
@@ -983,16 +1021,20 @@ public class PartitionChangeBuilderTest {
build();
Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
0, r -> true, MetadataVersion.IBP_3_7_IV2, 2).
- setTargetReplicas(Arrays.asList(3, 1, 4)).build();
+ setTargetReplicas(Arrays.asList(3, 1, 5, 4)).
+ setDirectory(5, Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg")).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
+ build();
Optional<ApiMessageAndVersion> expected = Optional.of(new ApiMessageAndVersion(
new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setLeader(1).
- setReplicas(Arrays.asList(3, 1, 4)).
+ setReplicas(Arrays.asList(3, 1, 5, 4)).
setDirectories(Arrays.asList(
Uuid.fromString("fM5NKyWTQHqEihjIkUl99Q"),
Uuid.fromString("iU2znv45Q9yQkOpkTSy3jA"),
+ Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg"),
DirectoryId.UNASSIGNED
)),
(short) 1
@@ -1000,12 +1042,48 @@ public class PartitionChangeBuilderTest {
assertEquals(expected, built);
}
+ @Test
+ public void testUpdateDirectories() {
+ PartitionRegistration registration = new PartitionRegistration.Builder().
+ setReplicas(new int[]{2, 1, 3}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("S1zMYZczRjWmucidLqGA5g"),
+ Uuid.fromString("9eRNXTvFTsWUJObvW51V5A"),
+ Uuid.fromString("UpePYVBgRAi3c4ujQrf3Kg")
+ }).
+ setIsr(new int[]{2, 1, 3}).
+ setLeader(2).
+ setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+ setLeaderEpoch(100).
+ setPartitionEpoch(200).
+ build();
+ Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
+ 0, r -> true, withDirectoryAssignmentSupport(MetadataVersion.latest()), 2).
+ setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")).
+ setDirectory(1, DirectoryId.LOST).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
+ build();
+ Optional<ApiMessageAndVersion> expected = Optional.of(new ApiMessageAndVersion(
+ new PartitionChangeRecord().
+ setTopicId(FOO_ID).
+ setPartitionId(0).
+ setDirectories(Arrays.asList(
+ Uuid.fromString("S1zMYZczRjWmucidLqGA5g"),
+ DirectoryId.LOST,
+ Uuid.fromString("pN1VKs9zRzK4APflpegAVg")
+ )),
+ (short) 2
+ ));
+ assertEquals(expected, built);
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEnabled) {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
+ .setDirectories(DirectoryId.migratingArray(4))
.setIsr(new int[] {1})
.setElr(new int[] {3})
.setLastKnownElr(lastKnownLeaderEnabled ? new int[] {} : new int[] {2})
@@ -1021,7 +1099,8 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
- .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
+ .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled)
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
builder.setTargetIsr(Collections.emptyList());
@@ -1048,6 +1127,12 @@ public class PartitionChangeBuilderTest {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
+ .setDirectories(new Uuid[]{
+ Uuid.fromString("MrTKKPEpRv66ZpWv4V7EBQ"),
+ Uuid.fromString("CkvgdEcWTVmdhfNuJXL0xA"),
+ Uuid.fromString("4a2coMsPRkSCsiTVWSksSw"),
+ Uuid.fromString("tmPdVjzASZ2ZqiS0cVJvtQ")
+ })
.setIsr(new int[] {1, 2, 3, 4})
.setElr(new int[] {})
.setLastKnownElr(new int[] {})
@@ -1063,6 +1148,7 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
builder.setTargetIsr(Collections.emptyList());
@@ -1090,6 +1176,7 @@ public class PartitionChangeBuilderTest {
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
.setUncleanShutdownReplicas(Arrays.asList(2))
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
PartitionChangeRecord changeRecord = (PartitionChangeRecord) builder.build().get().message();
assertTrue(changeRecord.lastKnownELR() == null, changeRecord.toString());
@@ -1104,6 +1191,7 @@ public class PartitionChangeBuilderTest {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
+ .setDirectories(DirectoryId.migratingArray(4))
.setIsr(new int[] {})
.setElr(new int[] {})
.setLastKnownElr(new int[] {1})
@@ -1118,6 +1206,7 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setUseLastKnownLeaderInBalancedRecovery(true)
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setEligibleLeaderReplicasEnabled(true);
builder.setTargetIsr(Collections.emptyList());
@@ -1144,6 +1233,12 @@ public class PartitionChangeBuilderTest {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
+ .setDirectories(new Uuid[]{
+ Uuid.fromString("zANDdMukTEqefOvHpmniMg"),
+ Uuid.fromString("Ui2Eq8rbRiuW7m7uiPTRyg"),
+ Uuid.fromString("MhgJOZrrTsKNcGM0XKK4aA"),
+ Uuid.fromString("Y25PaCAmRfyGIKxAThhBAw")
+ })
.setIsr(new int[] {})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {1})
@@ -1158,6 +1253,7 @@ public class PartitionChangeBuilderTest {
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
+ .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);
builder.setTargetIsr(Collections.emptyList());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
index a488e50639d..dd1d567e587 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.placement.PartitionAssignment;
@@ -134,6 +135,11 @@ public class PartitionReassignmentReplicasTest {
assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 3, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("HEKOeWDdQOqr2cmHrnjqjA"),
+ Uuid.fromString("I8kmmcM5TjOwNFnGvJLCjA"),
+ Uuid.fromString("x8osEoRkQdupZNYpU5c3Lw"),
+ Uuid.fromString("OT6qgtRqTiuiX8EikvAVow")}).
setIsr(new int[]{0, 1, 3, 2}).
setRemovingReplicas(new int[]{2}).
setAddingReplicas(new int[]{3}).
@@ -145,6 +151,12 @@ public class PartitionReassignmentReplicasTest {
assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 3, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("QrbOddSYQg6JgFu7hLvOTg"),
+ Uuid.fromString("S585FNNoSmiSH6ZYCrNqCg"),
+ Uuid.fromString("wjT5ieLARfKYMWIzTFwcag"),
+ Uuid.fromString("qzX9qWPVTWuLbiEQL0cgeg")
+ }).
setIsr(new int[]{0, 1, 3, 2}).
setRemovingReplicas(new int[]{2}).
setLeader(0).
@@ -155,6 +167,12 @@ public class PartitionReassignmentReplicasTest {
assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 3, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("QIyJnfdUSz6laFLCgj3AjA"),
+ Uuid.fromString("1QIvvBx2QVqNw2dsnYXUZg"),
+ Uuid.fromString("yPvPnGrxR0q8KC2Q5k0FIg"),
+ Uuid.fromString("a0lnxzleTcWVf1IyalE9cA")
+ }).
setIsr(new int[]{0, 1, 3, 2}).
setAddingReplicas(new int[]{3}).
setLeader(0).
@@ -165,6 +183,11 @@ public class PartitionReassignmentReplicasTest {
assertFalse(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("I4qCCBe9TYGOB0xvmvTI7w"),
+ Uuid.fromString("JvzGem0nTxiNPM5jIzNzlA"),
+ Uuid.fromString("EfWjZ2EsSKSvEn9PkG7lWQ")
+ }).
setIsr(new int[]{0, 1, 2}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
index 69a617c35b9..05148813e81 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
import java.util.Arrays;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.junit.jupiter.api.Test;
@@ -35,6 +36,11 @@ public class PartitionReassignmentRevertTest {
public void testNoneAddedOrRemoved() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {3, 2, 1}).setIsr(new int[] {3, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("Qln01zZAQMKzFTRCw22Y4w"),
+ Uuid.fromString("jjUcnIL2TxWEGMZ1mHvkPA"),
+ Uuid.fromString("JSZNFA0uQFmH1N75hQxWug")
+ }).
setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -46,6 +52,11 @@ public class PartitionReassignmentRevertTest {
public void testSomeRemoving() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {3, 2, 1}).setIsr(new int[] {3, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("WG58zlb5RR6TqdI81SCXeA"),
+ Uuid.fromString("izoB1H6TQdOExQ4XNMNXeQ"),
+ Uuid.fromString("TluNaJDjRemuy17sO7dDKg")
+ }).
setRemovingReplicas(new int[]{2, 1}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -57,6 +68,13 @@ public class PartitionReassignmentRevertTest {
public void testSomeAdding() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {4, 5, 3, 2, 1}).setIsr(new int[] {4, 5, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("gR9P3R9NQ5GhtewattwuZw"),
+ Uuid.fromString("vzgieGUjSr6vPvl3ZAWQcg"),
+ Uuid.fromString("8UWF5CQfQDqSyzcChmbrgw"),
+ Uuid.fromString("X3J9b4K5TumAM5a3YOKk2w"),
+ Uuid.fromString("LjZGhHfFRSCSCQw42dLlNA")
+ }).
setAddingReplicas(new int[]{4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -68,6 +86,13 @@ public class PartitionReassignmentRevertTest {
public void testSomeRemovingAndAdding() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {4, 5, 3, 2, 1}).setIsr(new int[] {4, 5, 2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("IHR5DKGdQju05pbDpwfdbA"),
+ Uuid.fromString("9zsVmGReTDOAyuPEtp58Cw"),
+ Uuid.fromString("bsUouEfRSLi50Pj3nqke2A"),
+ Uuid.fromString("8l9R5BMcQZGbICOXPmxZNw"),
+ Uuid.fromString("3n5Gwv8jRMiIFMgoTxVCdA")
+ }).
setRemovingReplicas(new int[]{2}).setAddingReplicas(new int[]{4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
@@ -79,6 +104,13 @@ public class PartitionReassignmentRevertTest {
public void testIsrSpecialCase() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {4, 5, 3, 2, 1}).setIsr(new int[] {4, 5}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("1oXnuHL6T8y7yMtEP4FSdg"),
+ Uuid.fromString("3ddu6izxT2aCpQrA3C2bjw"),
+ Uuid.fromString("WvCpqmaZTaSbifd43Vl2Xg"),
+ Uuid.fromString("n0cmj6NgTaahRMwa75FnRA"),
+ Uuid.fromString("S2mDcyiAQAe92ZrlyodaDw")
+ }).
setRemovingReplicas(new int[]{2}).setAddingReplicas(new int[]{4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 8ecdafe65ea..82dabed8bd6 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -141,6 +141,7 @@ import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXC
import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -345,15 +346,31 @@ public class ReplicationControlManagerTest {
}
void registerBrokers(Integer... brokerIds) throws Exception {
- for (int brokerId : brokerIds) {
+ Object[] brokersAndDirs = new Object[brokerIds.length * 2];
+ for (int i = 0; i < brokerIds.length; i++) {
+ brokersAndDirs[i * 2] = brokerIds[i];
+ brokersAndDirs[i * 2 + 1] = Collections.singletonList(DirectoryId.UNASSIGNED);
+ }
+ registerBrokersWithDirs(brokersAndDirs);
+ }
+
+ @SuppressWarnings("unchecked")
+ void registerBrokersWithDirs(Object... brokerIdsAndDirs) throws Exception {
+ if (brokerIdsAndDirs.length % 2 != 0) {
+ throw new IllegalArgumentException("uneven number of arguments");
+ }
+ for (int i = 0; i < brokerIdsAndDirs.length / 2; i++) {
+ int brokerId = (int) brokerIdsAndDirs[i * 2];
+ List<Uuid> logDirs = (List<Uuid>) brokerIdsAndDirs[i * 2 + 1];
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
- setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId).setRack(null);
+ setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId).
+ setRack(null).setLogDirs(logDirs);
brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092 + brokerId).
setName("PLAINTEXT").
setHost("localhost"));
- replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 0)));
+ replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 3)));
}
}
@@ -508,7 +525,9 @@ public class ReplicationControlManagerTest {
@Test
public void testCreateTopics() throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+ .build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
@@ -550,6 +569,7 @@ public class ReplicationControlManagerTest {
assertEquals(expectedResponse3, result3.response());
ctx.replay(result3.records());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(),
replicationControl.getPartition(
((TopicRecord) result3.records().get(0).message()).topicId(), 0));
@@ -584,7 +604,9 @@ public class ReplicationControlManagerTest {
@Test
public void testCreateTopicsISRInvariants() throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+ .build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -612,6 +634,7 @@ public class ReplicationControlManagerTest {
// cannot be in the ISR because it is in controlled shutdown.
assertEquals(
new PartitionRegistration.Builder().setReplicas(new int[]{1, 0, 2}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[]{0}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1515,6 +1538,7 @@ public class ReplicationControlManagerTest {
// cannot be in the ISR because it is in controlled shutdown.
assertEquals(
new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[]{0}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1568,7 +1592,10 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testReassignPartitions(short version) throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ MetadataVersion metadataVersion = withDirectoryAssignmentSupport(MetadataVersion.latest());
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
@@ -1634,9 +1661,9 @@ public class ReplicationControlManagerTest {
new PartitionChangeRecord().setTopicId(fooId).
setPartitionId(1).
setReplicas(asList(2, 1, 3)).
+ setDirectories(asList(DirectoryId.migratingArray(3))).
setLeader(3).
setRemovingReplicas(Collections.emptyList()).
- setDirectories(Arrays.asList(DirectoryId.unassignedArray(3))).
setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
@@ -1686,7 +1713,9 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+ .build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@@ -1702,6 +1731,7 @@ public class ReplicationControlManagerTest {
assertEquals(
new PartitionRegistration.Builder().
setReplicas(new int[] {1, 2, 3, 4}).
+ setDirectories(DirectoryId.migratingArray(4)).
setIsr(new int[] {1, 2, 4}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1832,7 +1862,9 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+ .build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@@ -1844,6 +1876,7 @@ public class ReplicationControlManagerTest {
assertEquals(
new PartitionRegistration.Builder().
setReplicas(new int[] {1, 2, 3, 4}).
+ setDirectories(DirectoryId.migratingArray(4)).
setIsr(new int[] {1, 2, 3, 4}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -1886,7 +1919,10 @@ public class ReplicationControlManagerTest {
@Test
public void testCancelReassignPartitions() throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ MetadataVersion metadataVersion = withDirectoryAssignmentSupport(MetadataVersion.latest());
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@@ -1900,6 +1936,7 @@ public class ReplicationControlManagerTest {
replication.handleBrokerFenced(3, fenceRecords);
ctx.replay(fenceRecords);
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}).setIsr(new int[] {1, 2, 4}).
+ setDirectories(DirectoryId.migratingArray(4)).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build(), replication.getPartition(fooId, 0));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
@@ -1937,10 +1974,13 @@ public class ReplicationControlManagerTest {
alterResult.response());
ctx.replay(alterResult.records());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2}).
+ setDirectories(DirectoryId.migratingArray(3)).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 0}).setIsr(new int[] {0, 1, 2}).
+ setDirectories(DirectoryId.migratingArray(4)).
setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(fooId, 1));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 0}).setIsr(new int[] {4, 2}).
+ setDirectories(DirectoryId.migratingArray(5)).
setAddingReplicas(new int[] {0, 1}).setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(barId, 0));
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
@@ -1987,8 +2027,8 @@ public class ReplicationControlManagerTest {
setPartitionId(0).
setLeader(4).
setReplicas(asList(2, 3, 4)).
+ setDirectories(asList(DirectoryId.migratingArray(3))).
setRemovingReplicas(null).
- setDirectories(Arrays.asList(DirectoryId.unassignedArray(3))).
setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
@@ -2001,6 +2041,7 @@ public class ReplicationControlManagerTest {
ctx.replay(cancelResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {2, 3, 4}).setIsr(new int[] {4, 2}).
+ setDirectories(DirectoryId.migratingArray(3)).
setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(3).build(), replication.getPartition(barId, 0));
}
@@ -2014,7 +2055,9 @@ public class ReplicationControlManagerTest {
@Test
public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest()))
+ .build();
ctx.registerBrokers(0, 1, 2, 3, 4, 5);
ctx.unfenceBrokers(0, 1, 2);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId();
@@ -2022,6 +2065,7 @@ public class ReplicationControlManagerTest {
INVALID_REPLICA_ASSIGNMENT.code());
ctx.createPartitions(2, "foo", new int[][] {new int[] {2, 4, 5}}, NONE.code());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {2, 4, 5}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[] {2}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(),
ctx.replicationControl.getPartition(fooId, 1));
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
index 1f6adf93c57..b7c28bab183 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
@@ -47,6 +47,7 @@ import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.Fak
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakePartitionRegistration;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakeTopicImage;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakeTopicsImage;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControllerMetadataMetricsPublisherTest {
@@ -145,7 +146,7 @@ public class ControllerMetadataMetricsPublisherTest {
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
ImageReWriter writer = new ImageReWriter(delta);
IMAGE1.write(writer, new ImageWriterOptions.Builder().
- setMetadataVersion(delta.image().features().metadataVersion()).
+ setMetadataVersion(withDirectoryAssignmentSupport(delta.image().features().metadataVersion())).
build());
env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true));
assertEquals(0, env.metrics.activeBrokerCount());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
index eaf789619d1..93473abed1a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
@@ -36,6 +36,7 @@ import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.Fak
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.OFFLINE;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.fakePartitionRegistration;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ControllerMetricsChangesTest {
@@ -156,7 +157,7 @@ public class ControllerMetricsChangesTest {
static {
ImageWriterOptions options = new ImageWriterOptions.Builder().
- setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build(); // highest MV for PartitionRecord v0
+ setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.IBP_3_7_IV0)).build(); // highest MV for PartitionRecord v0
TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap()));
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 0, options).message());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
index 96c51471188..a0e3a5cd86a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.kafka.controller.metrics;
import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
@@ -73,6 +74,7 @@ public class ControllerMetricsTestUtils {
}
return new PartitionRegistration.Builder().
setReplicas(new int[] {0, 1, 2}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[] {0, 1, 2}).
setLeader(leader).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
diff --git a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
index 213e3c63393..fc228bb5c84 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
@@ -38,9 +38,8 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
@Timeout(value = 40)
@@ -133,8 +132,7 @@ public class ImageDowngradeTest {
@Test
void testDirectoryAssignmentState() {
MetadataVersion outputMetadataVersion = MetadataVersion.IBP_3_7_IV0;
- MetadataVersion inputMetadataVersion = spy(outputMetadataVersion); // TODO replace with actual MV after bump for KIP-858
- when(inputMetadataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
+ MetadataVersion inputMetadataVersion = withDirectoryAssignmentSupport(outputMetadataVersion);
PartitionRecord testPartitionRecord = (PartitionRecord) TEST_RECORDS.get(1).message();
writeWithExpectedLosses(outputMetadataVersion,
Collections.singletonList("the directory assignment state of one or more replicas"),
diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index 0a0d97dd558..db366b0281d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Timeout;
import java.util.List;
import java.util.Optional;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -114,7 +115,7 @@ public class MetadataImageTest {
private static void testToImage(MetadataImage image) {
testToImage(image, new ImageWriterOptions.Builder()
- .setMetadataVersion(image.features().metadataVersion())
+ .setMetadataVersion(withDirectoryAssignmentSupport(image.features().metadataVersion()))
.build(), Optional.empty());
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index d9bf0876714..9ef72125b66 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.image;
+import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -28,6 +29,7 @@ import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
@@ -47,6 +49,7 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHAN
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -101,13 +104,17 @@ public class TopicsImageTest {
TOPIC_IMAGES1 = Arrays.asList(
newTopicImage("foo", FOO_UUID,
new PartitionRegistration.Builder().setReplicas(new int[] {2, 3, 4}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[] {2, 3}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(345).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {3, 4, 5}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[] {3, 4, 5}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(4).setPartitionEpoch(684).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {2, 4, 5}).
+ setDirectories(DirectoryId.migratingArray(3)).
setIsr(new int[] {2, 4, 5}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(10).setPartitionEpoch(84).build()),
newTopicImage("bar", BAR_UUID,
new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2, 3, 4}).
+ setDirectories(DirectoryId.migratingArray(5)).
setIsr(new int[] {0, 1, 2, 3}).setRemovingReplicas(new int[] {1}).setAddingReplicas(new int[] {3, 4}).setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(345).build()));
IMAGE1 = new TopicsImage(newTopicsByIdMap(TOPIC_IMAGES1), newTopicsByNameMap(TOPIC_IMAGES1));
@@ -140,9 +147,11 @@ public class TopicsImageTest {
List<TopicImage> topics2 = Arrays.asList(
newTopicImage("bar", BAR_UUID,
new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2, 3, 4}).
+ setDirectories(DirectoryId.migratingArray(5)).
setIsr(new int[] {0, 1, 2, 3}).setRemovingReplicas(new int[] {1}).setAddingReplicas(new int[] {3, 4}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(2).setPartitionEpoch(346).build()),
newTopicImage("baz", BAZ_UUID,
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}).
+ setDirectories(DirectoryId.migratingArray(4)).
setIsr(new int[] {3, 4}).setRemovingReplicas(new int[] {2}).setAddingReplicas(new int[] {1}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(2).setPartitionEpoch(1).build()));
IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
}
@@ -162,8 +171,13 @@ public class TopicsImageTest {
}
private PartitionRegistration newPartition(int[] replicas) {
+ Uuid[] directories = new Uuid[replicas.length];
+ for (int i = 0; i < replicas.length; i++) {
+ directories[i] = DirectoryId.random();
+ }
return new PartitionRegistration.Builder().
setReplicas(replicas).
+ setDirectories(directories).
setIsr(replicas).
setLeader(replicas[0]).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -426,7 +440,9 @@ public class TopicsImageTest {
private static List<ApiMessageAndVersion> getImageRecords(TopicsImage image) {
RecordListWriter writer = new RecordListWriter();
- image.write(writer, new ImageWriterOptions.Builder().build());
+ image.write(writer, new ImageWriterOptions.Builder().
+ setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest())).
+ build());
return writer.records();
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index f4494a72b01..78b7ff899fc 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -36,12 +36,11 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.kafka.metadata.util.MetadataFeatureUtil.withDirectoryAssignmentSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
@Timeout(value = 40)
public class BrokerRegistrationTest {
@@ -144,10 +143,8 @@ public class BrokerRegistrationTest {
}
private void testRoundTrip(BrokerRegistration registration) {
- MetadataVersion metdataVersion = spy(MetadataVersion.latest()); // TODO replace with actual MV after bump for KIP-858
- when(metdataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
ImageWriterOptions options = new ImageWriterOptions.Builder().
- setMetadataVersion(metdataVersion).
+ setMetadataVersion(withDirectoryAssignmentSupport(MetadataVersion.latest())).
build();
ApiMessageAndVersion messageAndVersion = registration.
toRecord(options);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index b92a9f62221..3c8a402aa05 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -44,6 +44,7 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -64,11 +65,14 @@ public class PartitionRegistrationTest {
@Test
public void testPartitionControlInfoMergeAndDiff() {
PartitionRegistration a = new PartitionRegistration.Builder().
- setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
+ setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)).
+ setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
PartitionRegistration b = new PartitionRegistration.Builder().
- setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{3}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(1).build();
+ setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)).
+ setIsr(new int[]{3}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(1).build();
PartitionRegistration c = new PartitionRegistration.Builder().
- setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1}).setLastKnownElr(new int[]{3}).setElr(new int[]{2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
+ setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)).
+ setIsr(new int[]{1}).setLastKnownElr(new int[]{3}).setElr(new int[]{2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
assertEquals(b, a.merge(new PartitionChangeRecord().
setLeader(3).setIsr(Arrays.asList(3))));
assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, partitionEpoch: 0 -> 1",
@@ -80,7 +84,9 @@ public class PartitionRegistrationTest {
@Test
public void testRecordRoundTrip() {
PartitionRegistration registrationA = new PartitionRegistration.Builder().
- setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setRemovingReplicas(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
+ setReplicas(new int[]{1, 2, 3}).
+ setDirectories(DirectoryId.migratingArray(3)).
+ setIsr(new int[]{1, 2}).setRemovingReplicas(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
int partitionId = 4;
ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
@@ -93,9 +99,21 @@ public class PartitionRegistrationTest {
@Test
public void testToLeaderAndIsrPartitionState() {
PartitionRegistration a = new PartitionRegistration.Builder().
- setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(123).setPartitionEpoch(456).build();
+ setReplicas(new int[]{1, 2, 3}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("NSmkU0ieQuy2IHN59Ce0Bw"),
+ Uuid.fromString("Y8N9gnSKSLKKFCioX2laGA"),
+ Uuid.fromString("Oi7nvb8KQPyaGEqr4JtCRw")
+ }).
+ setIsr(new int[]{1, 2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(123).setPartitionEpoch(456).build();
PartitionRegistration b = new PartitionRegistration.Builder().
- setReplicas(new int[]{2, 3, 4}).setIsr(new int[]{2, 3, 4}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(234).setPartitionEpoch(567).build();
+ setReplicas(new int[]{2, 3, 4}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("tAn3q03aQAWEYkNajXm3lA"),
+ Uuid.fromString("zgj8rqatTmWMyWBsRZyiVg"),
+ Uuid.fromString("bAAlGAz1TN2doZjtWlvhRQ")
+ }).
+ setIsr(new int[]{2, 3, 4}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(234).setPartitionEpoch(567).build();
assertEquals(new LeaderAndIsrPartitionState().
setTopicName("foo").
setPartitionIndex(1).
@@ -159,7 +177,7 @@ public class PartitionRegistrationTest {
@Test
public void testBuilderThrowsIllegalStateExceptionWhenMissingIsr() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
- setReplicas(new int[]{0});
+ setReplicas(new int[]{0}).setDirectories(new Uuid[]{DirectoryId.UNASSIGNED});
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> builder.build());
assertEquals("You must set isr.", exception.getMessage());
}
@@ -168,6 +186,7 @@ public class PartitionRegistrationTest {
public void testBuilderThrowsIllegalStateExceptionWhenMissingLeader() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0}).
+ setDirectories(new Uuid[]{DirectoryId.LOST}).
setIsr(new int[]{0}).
setRemovingReplicas(new int[]{0}).
setAddingReplicas(new int[]{0});
@@ -180,6 +199,7 @@ public class PartitionRegistrationTest {
public void testBuilderThrowsIllegalStateExceptionWhenMissingLeaderRecoveryState() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0}).
+ setDirectories(new Uuid[]{DirectoryId.MIGRATING}).
setIsr(new int[]{0}).
setRemovingReplicas(new int[]{0}).
setAddingReplicas(new int[]{0}).
@@ -192,6 +212,7 @@ public class PartitionRegistrationTest {
public void testBuilderThrowsIllegalStateExceptionWhenMissingLeaderEpoch() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0}).
+ setDirectories(new Uuid[]{Uuid.fromString("OP4I696sRmCPanlNidxJYw")}).
setIsr(new int[]{0}).
setRemovingReplicas(new int[]{0}).
setAddingReplicas(new int[]{0}).
@@ -205,6 +226,7 @@ public class PartitionRegistrationTest {
public void testBuilderThrowsIllegalStateExceptionWhenMissingPartitionEpoch() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0}).
+ setDirectories(DirectoryId.migratingArray(1)).
setIsr(new int[]{0}).
setRemovingReplicas(new int[]{0}).
setAddingReplicas(new int[]{0}).
@@ -219,6 +241,7 @@ public class PartitionRegistrationTest {
public void testBuilderSuccess() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 2}).
+ setDirectories(DirectoryId.unassignedArray(3)).
setIsr(new int[]{0, 1}).
setElr(new int[]{2}).
setLastKnownElr(new int[]{0, 1, 2}).
@@ -245,6 +268,7 @@ public class PartitionRegistrationTest {
public void testBuilderSetsDefaultAddingAndRemovingReplicas() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1}).
+ setDirectories(DirectoryId.migratingArray(2)).
setIsr(new int[]{0, 1}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -325,6 +349,7 @@ public class PartitionRegistrationTest {
public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 2, 3, 4}).
+ setDirectories(DirectoryId.migratingArray(5)).
setIsr(new int[]{0, 1}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@@ -340,7 +365,7 @@ public class PartitionRegistrationTest {
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
setLeaderEpoch(0).
- setDirectories(Arrays.asList(DirectoryId.unassignedArray(5))).
+ setDirectories(Arrays.asList(DirectoryId.migratingArray(5))).
setPartitionEpoch(0);
List<UnwritableMetadataException> exceptions = new ArrayList<>();
ImageWriterOptions options = new ImageWriterOptions.Builder().
@@ -370,25 +395,65 @@ public class PartitionRegistrationTest {
Arbitrary<PartitionRegistration> uniqueSamples() {
return Arbitraries.of(
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+ setDirectories(new Uuid[]{Uuid.fromString("HyTsxr8hT6Gq5heZMA2Bug"), Uuid.fromString("ePwTiSgFRvaKRBaUX3EcZQ"), Uuid.fromString("F3zwSDR1QWGKNNLMowVoYg")}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+ setDirectories(new Uuid[]{Uuid.fromString("94alcrMLQ6GOV8EHfAxJnA"), Uuid.fromString("LlD2QCA5RpalzKwPsUTGpw"), Uuid.fromString("Ahfjx9j5SIKpmz48pTLFRg")}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(101).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+ setDirectories(new Uuid[]{Uuid.fromString("KcXLjTpYSPGjM20DjHd5rA"), Uuid.fromString("NXiBSMNHSvWqvz3qM8a6Vg"), Uuid.fromString("yWinzh1DRD25nHuXUxLfBQ")}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).setElr(new int[] {1, 2}).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
+ setDirectories(new Uuid[]{Uuid.fromString("9bDLWtoRRaKUToKixl3NUg"), Uuid.fromString("nLJMwhSUTEOU7DEI0U2GOw"), Uuid.fromString("ULAltTBAQlG2peJh9DZZrw")}).
setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1}).
+ setDirectories(new Uuid[]{Uuid.fromString("kWM0QcMoRg6BHc7sdVsjZg"), Uuid.fromString("84F4VbPGTRWewKhlCYctbQ"), Uuid.fromString("W505iUM0S6a5Ds83d1WjcQ")}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING).setLeaderEpoch(100).setPartitionEpoch(200).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {4, 5, 6}).setAddingReplicas(new int[] {1, 2, 3}).
+ setDirectories(DirectoryId.unassignedArray(6)).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
+ setDirectories(DirectoryId.migratingArray(6)).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 3}).
+ setDirectories(DirectoryId.unassignedArray(6)).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
+ setDirectories(DirectoryId.migratingArray(6)).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {2, 3}).setLastKnownElr(new int[] {1, 2}).build()
);
}
+ @Test
+ public void testDirectories() {
+ PartitionRegistration partitionRegistration = new PartitionRegistration.Builder().
+ setReplicas(new int[] {3, 2, 1}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("FbRuu7CeQtq5YFreEzg16g"),
+ Uuid.fromString("4rtHTelWSSStAFMODOg3cQ"),
+ Uuid.fromString("Id1WXzHURROilVxZWJNZlw")
+ }).
+ setIsr(new int[] {1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+ setLeaderEpoch(100).setPartitionEpoch(200).build();
+ assertEquals(Uuid.fromString("Id1WXzHURROilVxZWJNZlw"), partitionRegistration.directory(1));
+ assertEquals(Uuid.fromString("4rtHTelWSSStAFMODOg3cQ"), partitionRegistration.directory(2));
+ assertEquals(Uuid.fromString("FbRuu7CeQtq5YFreEzg16g"), partitionRegistration.directory(3));
+ assertThrows(IllegalArgumentException.class, () -> partitionRegistration.directory(4));
+ }
+
+ @Test
+ public void testMigratingRecordDirectories() {
+ PartitionRecord record = new PartitionRecord().
+ setTopicId(Uuid.fromString("ONlQ7DDzQtGESsG499UDQg")).
+ setPartitionId(0).
+ setReplicas(Arrays.asList(0, 1)).
+ setIsr(Arrays.asList(0, 1)).
+ setLeader(0).
+ setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
+ setLeaderEpoch(0).
+ setPartitionEpoch(0);
+ PartitionRegistration registration = new PartitionRegistration(record);
+ assertArrayEquals(new Uuid[]{DirectoryId.MIGRATING, DirectoryId.MIGRATING}, registration.directories);
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java
index b5aacc71525..06cf5ae50d5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.metadata.placement;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -61,7 +62,13 @@ public class PartitionAssignmentTest {
@Test
public void testToString() {
List<Integer> replicas = Arrays.asList(0, 1, 2);
- PartitionAssignment partitionAssignment = new PartitionAssignment(replicas);
- assertEquals("PartitionAssignment(replicas=[0, 1, 2])", partitionAssignment.toString());
+ List<Uuid> directories = Arrays.asList(
+ Uuid.fromString("65WMNfybQpCDVulYOxMCTw"),
+ Uuid.fromString("VkZ5AkuESPGkMc2OxpKUjw"),
+ Uuid.fromString("wFtTi4FxTlOhhHytfxv7fQ")
+ );
+ PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, directories::get);
+ assertEquals("PartitionAssignment(replicas=[0, 1, 2], " +
+ "directories=[65WMNfybQpCDVulYOxMCTw, VkZ5AkuESPGkMc2OxpKUjw, wFtTi4FxTlOhhHytfxv7fQ])", partitionAssignment.toString());
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
index ef11845ee6e..8b02416d2be 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.metadata.placement;
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer.BrokerList;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer.RackList;
@@ -27,6 +29,7 @@ import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -94,7 +97,17 @@ public class StripedReplicaPlacerTest {
PlacementSpec placementSpec = new PlacementSpec(startPartition,
numPartitions,
replicationFactor);
- return placer.place(placementSpec, brokers::iterator);
+ return placer.place(placementSpec, new ClusterDescriber() {
+ @Override
+ public Iterator<UsableBroker> usableBrokers() {
+ return brokers.iterator();
+ }
+
+ @Override
+ public Uuid defaultDir(int brokerId) {
+ return DirectoryId.UNASSIGNED;
+ }
+ });
}
/**
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
index 61b2cbd6fa0..7b5a24c3b84 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.metadata.placement;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -74,10 +75,16 @@ public class TopicAssignmentTest {
@Test
public void testToString() {
List<Integer> replicas = Arrays.asList(0, 1, 2);
+ List<Uuid> directories = Arrays.asList(
+ Uuid.fromString("v56qeYzNRrqNtXsxzcReog"),
+ Uuid.fromString("MvUIAsOiRlSePeiBHdZrSQ"),
+ Uuid.fromString("jUqCchHtTHqMxeVv4dw1RA")
+ );
List<PartitionAssignment> partitionAssignments = Arrays.asList(
- new PartitionAssignment(replicas)
+ new PartitionAssignment(replicas, directories::get)
);
TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
- assertEquals("TopicAssignment(assignments=[PartitionAssignment(replicas=[0, 1, 2])])", topicAssignment.toString());
+ assertEquals("TopicAssignment(assignments=[PartitionAssignment(replicas=[0, 1, 2], " +
+ "directories=[v56qeYzNRrqNtXsxzcReog, MvUIAsOiRlSePeiBHdZrSQ, jUqCchHtTHqMxeVv4dw1RA])])", topicAssignment.toString());
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java b/metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java
similarity index 59%
copy from metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
copy to metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java
index 8aaa092205e..a01e47dbc75 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.kafka.metadata.placement;
+package org.apache.kafka.metadata.util;
-import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.mockito.internal.util.MockUtil;
-import java.util.Iterator;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
-
-/**
- * Can describe a cluster to a ReplicaPlacer.
- */
-@InterfaceStability.Unstable
-public interface ClusterDescriber {
- /**
- * Get an iterator through the usable brokers.
- */
- Iterator<UsableBroker> usableBrokers();
+public class MetadataFeatureUtil {
+ public static MetadataVersion withDirectoryAssignmentSupport(MetadataVersion metadataVersion) {
+ MetadataVersion spy = MockUtil.isMock(metadataVersion) ? metadataVersion : spy(metadataVersion);
+ when(spy.isDirectoryAssignmentSupported()).thenReturn(true);
+ return spy;
+ }
}
diff --git a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
index 48008a81e4f..4233198be05 100644
--- a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
+++ b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -120,8 +121,38 @@ public class DirectoryId {
* Create an array with the specified number of entries set to {@link #UNASSIGNED}.
*/
public static Uuid[] unassignedArray(int length) {
+ return array(length, UNASSIGNED);
+ }
+
+ /**
+ * Create an array with the specified number of entries set to {@link #MIGRATING}.
+ */
+ public static Uuid[] migratingArray(int length) {
+ return array(length, MIGRATING);
+ }
+
+ /**
+ * Create an array with the specified number of entries set to the specified value.
+ */
+ private static Uuid[] array(int length, Uuid value) {
Uuid[] array = new Uuid[length];
- Arrays.fill(array, UNASSIGNED);
+ Arrays.fill(array, value);
return array;
}
+
+ /**
+ * Check if a directory is online, given a sorted list of online directories.
+ * @param dir The directory to check
+ * @param sortedOnlineDirs The sorted list of online directories
+ * @return true if the directory is considered online, false otherwise
+ */
+ public static boolean isOnline(Uuid dir, List<Uuid> sortedOnlineDirs) {
+ if (UNASSIGNED.equals(dir) || MIGRATING.equals(dir)) {
+ return true;
+ }
+ if (LOST.equals(dir)) {
+ return false;
+ }
+ return Collections.binarySearch(sortedOnlineDirs, dir) >= 0;
+ }
}
diff --git a/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java b/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java
index 7a984c7cf8a..5b4d427f275 100644
--- a/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java
+++ b/server-common/src/test/java/org/apache/kafka/common/DirectoryIdTest.java
@@ -20,8 +20,10 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,50 +43,10 @@ public class DirectoryIdTest {
assertTrue(DirectoryId.reserved(DirectoryId.MIGRATING));
}
- @Test
- void testCreateDirectoriesFrom() {
- assertThrows(IllegalArgumentException.class, () -> DirectoryId.createDirectoriesFrom(
- new int[] {1},
- new Uuid[] {DirectoryId.UNASSIGNED, DirectoryId.LOST},
- Arrays.asList(2, 3)
- ));
- assertEquals(
- Arrays.asList(
- Uuid.fromString("YXY0bQYEQmmyOQ6ZDfGgSQ"),
- Uuid.fromString("5SZij3DRQgaFbvzR9KooLg"),
- DirectoryId.UNASSIGNED
- ),
- DirectoryId.createDirectoriesFrom(
- new int[] {1, 2, 3},
- new Uuid[] {
- Uuid.fromString("MgVK5KSwTxe65eYATaoQrg"),
- Uuid.fromString("YXY0bQYEQmmyOQ6ZDfGgSQ"),
- Uuid.fromString("5SZij3DRQgaFbvzR9KooLg")
- },
- Arrays.asList(2, 3, 4)
- )
- );
- assertEquals(
- Arrays.asList(
- DirectoryId.UNASSIGNED,
- DirectoryId.UNASSIGNED,
- DirectoryId.UNASSIGNED
- ),
- DirectoryId.createDirectoriesFrom(
- new int[] {1, 2},
- new Uuid[] {
- DirectoryId.UNASSIGNED,
- DirectoryId.UNASSIGNED
- },
- Arrays.asList(1, 2, 3)
- )
- );
- }
-
@Test
void testCreateAssignmentMap() {
- assertThrows(IllegalArgumentException.class,
- () -> DirectoryId.createAssignmentMap(new int[]{1, 2}, DirectoryId.unassignedArray(3)));
+ assertThrows(IllegalArgumentException.class, () ->
+ DirectoryId.createAssignmentMap(new int[]{1, 2}, DirectoryId.unassignedArray(3)));
assertEquals(
new HashMap<Integer, Uuid>() {{
put(1, Uuid.fromString("upjfkCrUR9GNn1i94ip1wg"));
@@ -102,4 +64,21 @@ public class DirectoryIdTest {
})
);
}
+
+ @Test
+ void testIsOnline() {
+ List<Uuid> sortedDirs = Arrays.asList(
+ Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"),
+ Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"),
+ Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw")
+ );
+ sortedDirs.sort(Uuid::compareTo);
+ assertTrue(DirectoryId.isOnline(Uuid.fromString("imQKg2cXTVe8OUFNa3R9bg"), sortedDirs));
+ assertTrue(DirectoryId.isOnline(Uuid.fromString("Mwy5wxTDQxmsZwGzjsaX7w"), sortedDirs));
+ assertTrue(DirectoryId.isOnline(Uuid.fromString("s8rHMluuSDCnxt3FmKwiyw"), sortedDirs));
+ assertTrue(DirectoryId.isOnline(DirectoryId.MIGRATING, sortedDirs));
+ assertTrue(DirectoryId.isOnline(DirectoryId.UNASSIGNED, sortedDirs));
+ assertFalse(DirectoryId.isOnline(DirectoryId.LOST, sortedDirs));
+ assertFalse(DirectoryId.isOnline(Uuid.fromString("AMYchbMtS6yhtsXbca7DQg"), sortedDirs));
+ }
}