You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/06/01 23:33:11 UTC
[kafka] branch trunk updated: MINOR: implement BrokerRegistrationChangeRecord (#12195)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65b4374203 MINOR: implement BrokerRegistrationChangeRecord (#12195)
65b4374203 is described below
commit 65b4374203608dc4d68544bde16d4acd2e31efd1
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Jun 1 16:33:01 2022 -0700
MINOR: implement BrokerRegistrationChangeRecord (#12195)
Implement BrokerRegistrationChangeRecord as specified in KIP-746. This is a more flexible record than the
single-purpose Fence / Unfence records.
Reviewers: José Armando García Sancio <js...@gmail.com>, dengziming <de...@gmail.com>
---
.../kafka/controller/ClusterControlManager.java | 79 ++++++++++++-------
.../apache/kafka/controller/QuorumController.java | 7 +-
.../controller/ReplicationControlManager.java | 89 ++++++++++++++--------
.../metadata/BrokerRegistrationFencingChange.java | 56 ++++++++++++++
.../controller/ClusterControlManagerTest.java | 37 +++++++--
.../controller/FeatureControlManagerTest.java | 4 +-
.../kafka/controller/QuorumControllerTest.java | 2 +-
.../controller/ReplicationControlManagerTest.java | 2 -
.../BrokerRegistrationFencingChangeTest.java | 50 ++++++++++++
.../kafka/server/common/MetadataVersion.java | 9 ++-
.../kafka/server/common/MetadataVersionTest.java | 6 +-
11 files changed, 265 insertions(+), 76 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 90dfb571b1..5c069e12e7 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
@@ -32,10 +33,12 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
@@ -379,7 +382,7 @@ public class ClusterControlManager {
if (registration == null) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration found for that id", record.toString()));
- } else if (registration.epoch() != record.brokerEpoch()) {
+ } else if (registration.epoch() != record.brokerEpoch()) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration with that epoch found", record.toString()));
} else {
@@ -391,41 +394,56 @@ public class ClusterControlManager {
}
public void replay(FenceBrokerRecord record) {
- int brokerId = record.id();
- BrokerRegistration registration = brokerRegistrations.get(brokerId);
- if (registration == null) {
- throw new RuntimeException(String.format("Unable to replay %s: no broker " +
- "registration found for that id", record.toString()));
- } else if (registration.epoch() != record.epoch()) {
- throw new RuntimeException(String.format("Unable to replay %s: no broker " +
- "registration with that epoch found", record.toString()));
- } else {
- if (heartbeatManager != null) heartbeatManager.register(brokerId, true);
- brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
- updateMetrics(registration, brokerRegistrations.get(brokerId));
- log.info("Fenced broker: {}", record);
- }
+ replayRegistrationChange(record, record.id(), record.epoch(),
+ BrokerRegistrationFencingChange.UNFENCE);
}
public void replay(UnfenceBrokerRecord record) {
- int brokerId = record.id();
- BrokerRegistration registration = brokerRegistrations.get(brokerId);
- if (registration == null) {
+ replayRegistrationChange(record, record.id(), record.epoch(),
+ BrokerRegistrationFencingChange.FENCE);
+ }
+
+ public void replay(BrokerRegistrationChangeRecord record) {
+ Optional<BrokerRegistrationFencingChange> fencingChange =
+ BrokerRegistrationFencingChange.fromValue(record.fenced());
+ if (!fencingChange.isPresent()) {
+ throw new RuntimeException(String.format("Unable to replay %s: unknown " +
+ "value for fenced field: %d", record.toString(), record.fenced()));
+ }
+ replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(),
+ fencingChange.get());
+ }
+
+ private void replayRegistrationChange(
+ ApiMessage record,
+ int brokerId,
+ long brokerEpoch,
+ BrokerRegistrationFencingChange fencingChange
+ ) {
+ BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
+ if (curRegistration == null) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration found for that id", record.toString()));
- } else if (registration.epoch() != record.epoch()) {
+ } else if (curRegistration.epoch() != brokerEpoch) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration with that epoch found", record.toString()));
} else {
- if (heartbeatManager != null) heartbeatManager.register(brokerId, false);
- brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
- updateMetrics(registration, brokerRegistrations.get(brokerId));
- log.info("Unfenced broker: {}", record);
- }
- if (readyBrokersFuture.isPresent()) {
- if (readyBrokersFuture.get().check()) {
- readyBrokersFuture.get().future.complete(null);
- readyBrokersFuture = Optional.empty();
+ BrokerRegistration nextRegistration = curRegistration;
+ if (fencingChange != BrokerRegistrationFencingChange.NONE) {
+ nextRegistration = nextRegistration.cloneWithFencing(fencingChange.asBoolean().get());
+ }
+ if (!curRegistration.equals(nextRegistration)) {
+ brokerRegistrations.put(brokerId, nextRegistration);
+ updateMetrics(curRegistration, nextRegistration);
+ } else {
+ log.info("Ignoring no-op registration change for {}", curRegistration);
+ }
+ if (heartbeatManager != null) heartbeatManager.register(brokerId, nextRegistration.fenced());
+ if (readyBrokersFuture.isPresent()) {
+ if (readyBrokersFuture.get().check()) {
+ readyBrokersFuture.get().future.complete(null);
+ readyBrokersFuture = Optional.empty();
+ }
}
}
}
@@ -437,19 +455,24 @@ public class ClusterControlManager {
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
}
+ log.info("Removed broker: {}", prevRegistration.id());
} else if (prevRegistration == null) {
if (registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
+ log.info("Added new fenced broker: {}", registration.id());
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+ log.info("Added new unfenced broker: {}", registration.id());
}
} else {
if (prevRegistration.fenced() && !registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+ log.info("Unfenced broker: {}", registration.id());
} else if (!prevRegistration.fenced() && registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
+ log.info("Fenced broker: {}", registration.id());
}
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 97a2040a38..e921ac4067 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
@@ -1262,7 +1263,6 @@ public final class QuorumController implements Controller {
break;
case FEATURE_LEVEL_RECORD:
featureControl.replay((FeatureLevelRecord) message);
-
handleFeatureControlChange();
break;
case CLIENT_QUOTA_RECORD:
@@ -1271,6 +1271,9 @@ public final class QuorumController implements Controller {
case PRODUCER_IDS_RECORD:
producerIdControlManager.replay((ProducerIdsRecord) message);
break;
+ case BROKER_REGISTRATION_CHANGE_RECORD:
+ clusterControl.replay((BrokerRegistrationChangeRecord) message);
+ break;
case ACCESS_CONTROL_ENTRY_RECORD:
aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
break;
@@ -1573,7 +1576,6 @@ public final class QuorumController implements Controller {
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
this.maxIdleIntervalNs = maxIdleIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder().
- setMetadataVersion(() -> featureControl.metadataVersion()).
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
@@ -1583,6 +1585,7 @@ public final class QuorumController implements Controller {
setClusterControl(clusterControl).
setControllerMetrics(controllerMetrics).
setCreateTopicPolicy(createTopicPolicy).
+ setFeatureControl(featureControl).
build();
this.authorizer = authorizer;
authorizer.ifPresent(a -> a.setAclMutator(this));
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 81ec253434..13e41c978c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -17,6 +17,7 @@
package org.apache.kafka.controller;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.ElectionType;
@@ -63,6 +64,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.Lis
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@@ -75,6 +77,7 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
@@ -112,12 +115,9 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
-import static org.apache.kafka.common.metadata.MetadataRecordType.FENCE_BROKER_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
-import static org.apache.kafka.common.metadata.MetadataRecordType.UNFENCE_BROKER_RECORD;
-import static org.apache.kafka.common.metadata.MetadataRecordType.UNREGISTER_BROKER_RECORD;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
@@ -139,7 +139,6 @@ public class ReplicationControlManager {
static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
static class Builder {
- private Supplier<MetadataVersion> metadataVersion = MetadataVersion::latest;
private SnapshotRegistry snapshotRegistry = null;
private LogContext logContext = null;
private short defaultReplicationFactor = (short) 3;
@@ -149,11 +148,7 @@ public class ReplicationControlManager {
private ClusterControlManager clusterControl = null;
private ControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
-
- Builder setMetadataVersion(Supplier<MetadataVersion> metadataVersion) {
- this.metadataVersion = metadataVersion;
- return this;
- }
+ private FeatureControlManager featureControl = null;
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
@@ -200,6 +195,11 @@ public class ReplicationControlManager {
return this;
}
+ public Builder setFeatureControl(FeatureControlManager featureControl) {
+ this.featureControl = featureControl;
+ return this;
+ }
+
ReplicationControlManager build() {
if (configurationControl == null) {
throw new IllegalStateException("Configuration control must be set before building");
@@ -210,9 +210,17 @@ public class ReplicationControlManager {
}
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
- return new ReplicationControlManager(
- metadataVersion,
- snapshotRegistry,
+ if (featureControl == null) {
+ featureControl = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setSnapshotRegistry(snapshotRegistry).
+ setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+ QuorumFeatures.defaultFeatureMap(),
+ Collections.singletonList(0))).
+ setMetadataVersion(MetadataVersion.latest()).
+ build();
+ }
+ return new ReplicationControlManager(snapshotRegistry,
logContext,
defaultReplicationFactor,
defaultNumPartitions,
@@ -220,7 +228,8 @@ public class ReplicationControlManager {
configurationControl,
clusterControl,
controllerMetrics,
- createTopicPolicy);
+ createTopicPolicy,
+ featureControl);
}
}
@@ -275,11 +284,6 @@ public class ReplicationControlManager {
*/
private final int defaultNumPartitions;
- /**
- * Metadata version (or IBP) for the cluster
- */
- private final Supplier<MetadataVersion> metadataVersion;
-
/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
@@ -310,6 +314,11 @@ public class ReplicationControlManager {
*/
private final Optional<CreateTopicPolicy> createTopicPolicy;
+ /**
+ * The feature control manager.
+ */
+ private final FeatureControlManager featureControl;
+
/**
* Maps topic names to topic UUIDs.
*/
@@ -358,7 +367,6 @@ public class ReplicationControlManager {
final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
private ReplicationControlManager(
- Supplier<MetadataVersion> metadataVersion,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
short defaultReplicationFactor,
@@ -367,9 +375,9 @@ public class ReplicationControlManager {
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
ControllerMetrics controllerMetrics,
- Optional<CreateTopicPolicy> createTopicPolicy
+ Optional<CreateTopicPolicy> createTopicPolicy,
+ FeatureControlManager featureControl
) {
- this.metadataVersion = metadataVersion;
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(ReplicationControlManager.class);
this.defaultReplicationFactor = defaultReplicationFactor;
@@ -378,6 +386,7 @@ public class ReplicationControlManager {
this.configurationControl = configurationControl;
this.controllerMetrics = controllerMetrics;
this.createTopicPolicy = createTopicPolicy;
+ this.featureControl = featureControl;
this.clusterControl = clusterControl;
this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -929,7 +938,7 @@ public class ReplicationControlManager {
topic.id,
partitionId,
r -> clusterControl.unfenced(r),
- metadataVersion.get().isLeaderRecoverySupported());
+ featureControl.metadataVersion().isLeaderRecoverySupported());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1095,9 +1104,16 @@ public class ReplicationControlManager {
}
generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
- records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
- setId(brokerId).setEpoch(brokerRegistration.epoch()),
- FENCE_BROKER_RECORD.highestSupportedVersion()));
+ if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
+ records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+ setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).
+ setFenced(BrokerRegistrationFencingChange.UNFENCE.value()),
+ (short) 0));
+ } else {
+ records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
+ setId(brokerId).setEpoch(brokerRegistration.epoch()),
+ (short) 0));
+ }
}
/**
@@ -1116,7 +1132,7 @@ public class ReplicationControlManager {
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch),
- UNREGISTER_BROKER_RECORD.highestSupportedVersion()));
+ (short) 0));
}
/**
@@ -1131,8 +1147,15 @@ public class ReplicationControlManager {
* @param records The record list to append to.
*/
void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
- records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
- setEpoch(brokerEpoch), UNFENCE_BROKER_RECORD.highestSupportedVersion()));
+ if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
+ records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+ setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
+ setFenced(BrokerRegistrationFencingChange.FENCE.value()),
+ (short) 0));
+ } else {
+ records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
+ setEpoch(brokerEpoch), (short) 0));
+ }
generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithNoLeader());
}
@@ -1223,7 +1246,7 @@ public class ReplicationControlManager {
topicId,
partitionId,
r -> clusterControl.unfenced(r),
- metadataVersion.get().isLeaderRecoverySupported());
+ featureControl.metadataVersion().isLeaderRecoverySupported());
builder.setElection(election);
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
@@ -1339,7 +1362,7 @@ public class ReplicationControlManager {
topicPartition.topicId(),
topicPartition.partitionId(),
r -> clusterControl.unfenced(r),
- metadataVersion.get().isLeaderRecoverySupported()
+ featureControl.metadataVersion().isLeaderRecoverySupported()
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
builder.build().ifPresent(records::add);
@@ -1542,7 +1565,7 @@ public class ReplicationControlManager {
topicIdPart.topicId(),
topicIdPart.partitionId(),
isAcceptableLeader,
- metadataVersion.get().isLeaderRecoverySupported());
+ featureControl.metadataVersion().isLeaderRecoverySupported());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1652,7 +1675,7 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
r -> clusterControl.unfenced(r),
- metadataVersion.get().isLeaderRecoverySupported());
+ featureControl.metadataVersion().isLeaderRecoverySupported());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1704,7 +1727,7 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
r -> clusterControl.unfenced(r),
- metadataVersion.get().isLeaderRecoverySupported());
+ featureControl.metadataVersion().isLeaderRecoverySupported());
if (!reassignment.merged().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.merged());
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
new file mode 100644
index 0000000000..1c2729355f
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+public enum BrokerRegistrationFencingChange {
+ FENCE(-1, Optional.of(false)),
+ NONE(0, Optional.empty()),
+ UNFENCE(1, Optional.of(true));
+
+ private final byte value;
+
+ private final Optional<Boolean> asBoolean;
+
+ private final static Map<Byte, BrokerRegistrationFencingChange> VALUE_TO_ENUM =
+ Arrays.stream(BrokerRegistrationFencingChange.values()).
+ collect(Collectors.toMap(v -> Byte.valueOf(v.value()), Function.identity()));
+
+ public static Optional<BrokerRegistrationFencingChange> fromValue(byte value) {
+ return Optional.ofNullable(VALUE_TO_ENUM.get(value));
+ }
+
+ BrokerRegistrationFencingChange(int value, Optional<Boolean> asBoolean) {
+ this.value = (byte) value;
+ this.asBoolean = asBoolean;
+ }
+
+ public Optional<Boolean> asBoolean() {
+ return asBoolean;
+ }
+
+ public byte value() {
+ return value;
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index a173cf5f94..590578f6c6 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -23,14 +23,17 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -43,12 +46,15 @@ import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -57,8 +63,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class ClusterControlManagerTest {
- @Test
- public void testReplay() {
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV0", "IBP_3_3_IV2"})
+ public void testReplay(MetadataVersion metadataVersion) {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
@@ -86,11 +93,29 @@ public class ClusterControlManagerTest {
assertFalse(clusterControl.unfenced(0));
assertFalse(clusterControl.unfenced(1));
- UnfenceBrokerRecord unfenceBrokerRecord =
- new UnfenceBrokerRecord().setId(1).setEpoch(100);
- clusterControl.replay(unfenceBrokerRecord);
+ if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
+ UnfenceBrokerRecord unfenceBrokerRecord =
+ new UnfenceBrokerRecord().setId(1).setEpoch(100);
+ clusterControl.replay(unfenceBrokerRecord);
+ } else {
+ BrokerRegistrationChangeRecord changeRecord =
+ new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced((byte) -1);
+ clusterControl.replay(changeRecord);
+ }
assertFalse(clusterControl.unfenced(0));
assertTrue(clusterControl.unfenced(1));
+
+ if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
+ FenceBrokerRecord fenceBrokerRecord =
+ new FenceBrokerRecord().setId(1).setEpoch(100);
+ clusterControl.replay(fenceBrokerRecord);
+ } else {
+ BrokerRegistrationChangeRecord changeRecord =
+ new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced((byte) 1);
+ clusterControl.replay(changeRecord);
+ }
+ assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.unfenced(1));
}
@Test
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 3a8c4042df..fd1e5af39e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -251,11 +251,11 @@ public class FeatureControlManagerTest {
result = manager.updateFeatures(
- Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
+ Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_1_IV0.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(),
true);
- assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
+ assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
result = manager.updateFeatures(
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index eef7215070..4e898b924c 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -718,7 +718,7 @@ public class QuorumControllerTest {
setPartitionIndex(1).
setBrokerIds(Arrays.asList(1, 2, 0))).
iterator()))).iterator())),
- Collections.singleton(topicName)).get();
+ Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
}
logEnv.waitForLatestSnapshot();
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index abc90912a1..d7141a76dd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -76,7 +76,6 @@ import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@@ -165,7 +164,6 @@ public class ReplicationControlManagerTest {
ReplicationControlTestContext(Optional<CreateTopicPolicy> createTopicPolicy) {
this.replicationControl = new ReplicationControlManager.Builder().
- setMetadataVersion(() -> MetadataVersion.IBP_3_3_IV1).
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
setMaxElectionsPerImbalance(Integer.MAX_VALUE).
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationFencingChangeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationFencingChangeTest.java
new file mode 100644
index 0000000000..9266192f24
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationFencingChangeTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(40)
+public class BrokerRegistrationFencingChangeTest {
+ @Test
+ public void testValues() {
+ assertEquals((byte) -1, BrokerRegistrationFencingChange.FENCE.value());
+ assertEquals((byte) 0, BrokerRegistrationFencingChange.NONE.value());
+ assertEquals((byte) 1, BrokerRegistrationFencingChange.UNFENCE.value());
+ }
+
+ @Test
+ public void testAsBoolean() {
+ assertEquals(Optional.of(false), BrokerRegistrationFencingChange.FENCE.asBoolean());
+ assertEquals(Optional.empty(), BrokerRegistrationFencingChange.NONE.asBoolean());
+ assertEquals(Optional.of(true), BrokerRegistrationFencingChange.UNFENCE.asBoolean());
+ }
+
+ @Test
+ public void testValueRoundTrip() {
+ for (BrokerRegistrationFencingChange change : BrokerRegistrationFencingChange.values()) {
+ assertEquals(Optional.of(change), BrokerRegistrationFencingChange.fromValue(change.value()));
+ }
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 920c60300d..ee19faf88b 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -156,7 +156,10 @@ public enum MetadataVersion {
IBP_3_3_IV0(5, "3.3", "IV0", false),
// Support NoopRecord for the cluster metadata log (KIP-835)
- IBP_3_3_IV1(6, "3.3", "IV1", true);
+ IBP_3_3_IV1(6, "3.3", "IV1", true),
+
+ // In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
+ IBP_3_3_IV2(7, "3.3", "IV2", true);
public static final String FEATURE_NAME = "metadata.version";
@@ -236,6 +239,10 @@ public enum MetadataVersion {
}
}
+ public boolean isBrokerRegistrationChangeRecordSupported() {
+ return this.isAtLeast(IBP_3_3_IV2);
+ }
+
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 8649586be9..07a644e729 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -61,6 +61,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -189,9 +190,10 @@ class MetadataVersionTest {
assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
- assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3"));
+ assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3"));
assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3-IV0"));
assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3-IV1"));
+ assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3-IV2"));
}
@Test
@@ -237,6 +239,7 @@ class MetadataVersionTest {
assertEquals("3.2", IBP_3_2_IV0.shortVersion());
assertEquals("3.3", IBP_3_3_IV0.shortVersion());
assertEquals("3.3", IBP_3_3_IV1.shortVersion());
+ assertEquals("3.3", IBP_3_3_IV2.shortVersion());
}
@Test
@@ -271,6 +274,7 @@ class MetadataVersionTest {
assertEquals("3.2-IV0", IBP_3_2_IV0.version());
assertEquals("3.3-IV0", IBP_3_3_IV0.version());
assertEquals("3.3-IV1", IBP_3_3_IV1.version());
+ assertEquals("3.3-IV2", IBP_3_3_IV2.version());
}
@Test