You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/03/04 19:29:16 UTC
[kafka] branch trunk updated: MINOR: Enable topic deletion in the
KIP-500 controller (#10184)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eebc6f2 MINOR: Enable topic deletion in the KIP-500 controller (#10184)
eebc6f2 is described below
commit eebc6f279e61bcd9a331a3a0b305b57ffdc1ab66
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Thu Mar 4 11:28:20 2021 -0800
MINOR: Enable topic deletion in the KIP-500 controller (#10184)
This patch enables delete topic support for the new KIP-500 controller. Also fixes the following:
- Fix a bug where feature level records were not correctly replayed.
- Fix a bug in TimelineHashMap#remove where the wrong type was being returned.
Reviewers: Jason Gustafson <ja...@confluent.io>, Justine Olshan <jo...@confluent.io>, Ron Dagostino <rd...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Jun Rao <ju...@gmail.com>
Co-authored-by: Jason Gustafson <ja...@confluent.io>
---
checkstyle/import-control-core.xml | 2 +
checkstyle/suppressions.xml | 2 +-
.../main/scala/kafka/server/ControllerApis.scala | 203 ++++++++++++++++---
core/src/test/java/kafka/test/MockController.java | 222 +++++++++++++++++++++
.../unit/kafka/server/ControllerApisTest.scala | 217 +++++++++++++++++++-
.../org/apache/kafka/controller/BrokersToIsrs.java | 7 +
.../controller/ConfigurationControlManager.java | 6 +-
.../org/apache/kafka/controller/Controller.java | 26 +++
.../apache/kafka/controller/QuorumController.java | 68 +++++--
.../controller/ReplicationControlManager.java | 131 ++++++++++--
.../org/apache/kafka/controller/ResultOrError.java | 2 +-
.../kafka/timeline/SnapshottableHashTable.java | 2 +-
.../org/apache/kafka/timeline/TimelineHashMap.java | 7 +-
.../controller/ReplicationControlManagerTest.java | 195 ++++++++++++++----
.../apache/kafka/timeline/TimelineHashMapTest.java | 2 +
15 files changed, 980 insertions(+), 112 deletions(-)
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index e9653ba..e5abcca 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -59,6 +59,8 @@
</subpackage>
<subpackage name="test">
+ <allow pkg="org.apache.kafka.controller"/>
+ <allow pkg="org.apache.kafka.metadata"/>
<allow pkg="kafka.test.annotation"/>
<allow pkg="kafka.test.junit"/>
<allow pkg="kafka.network"/>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 33273fe..242fef6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -265,7 +265,7 @@
<!-- metadata -->
<suppress checks="ClassDataAbstractionCoupling"
- files="(ReplicationControlManager).java"/>
+ files="(ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="(QuorumController|ReplicationControlManager).java"/>
<suppress checks="CyclomaticComplexity"
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 5670448..1d9fd44 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -18,21 +18,26 @@
package kafka.server
import java.util
+import java.util.Collections
+import java.util.concurrent.ExecutionException
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.clients.admin.AlterConfigOp
-import org.apache.kafka.common.Node
-import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
+import org.apache.kafka.common.Uuid.ZERO_UUID
+import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException}
+import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource
@@ -46,6 +51,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import scala.collection.mutable
import scala.jdk.CollectionConverters._
+
/**
* Request handler for Controller APIs
*/
@@ -70,6 +76,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.METADATA => handleMetadataRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
+ case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.VOTE => handleVote(request)
case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
@@ -84,10 +91,11 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
- case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
+ case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
case e: FatalExitError => throw e
+ case e: ExecutionException => requestHelper.handleError(request, e.getCause)
case e: Throwable => requestHelper.handleError(request, e)
}
}
@@ -124,14 +132,14 @@ class ControllerApis(val requestChannel: RequestChannel,
val metadataResponseData = new MetadataResponseData()
metadataResponseData.setThrottleTimeMs(requestThrottleMs)
controllerNodes.foreach { node =>
- metadataResponseData.brokers().add(new MetadataResponseBroker()
+ metadataResponseData.brokers.add(new MetadataResponseBroker()
.setHost(node.host)
.setNodeId(node.id)
.setPort(node.port)
.setRack(node.rack))
}
metadataResponseData.setClusterId(metaProperties.clusterId.toString)
- if (controller.isActive()) {
+ if (controller.isActive) {
metadataResponseData.setControllerId(config.nodeId)
} else {
metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
@@ -153,17 +161,164 @@ class ControllerApis(val requestChannel: RequestChannel,
requestThrottleMs => createResponseCallback(requestThrottleMs))
}
+ def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+ val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+ request.context.apiVersion,
+ authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+ names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+ names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+ requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+ val responseData = new DeleteTopicsResponseData().
+ setResponses(new DeletableTopicResultCollection(responses.iterator)).
+ setThrottleTimeMs(throttleTimeMs)
+ new DeleteTopicsResponse(responseData)
+ })
+ }
+
+ def deleteTopics(request: DeleteTopicsRequestData,
+ apiVersion: Int,
+ hasClusterAuth: Boolean,
+ getDescribableTopics: Iterable[String] => Set[String],
+ getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+ // Check if topic deletion is enabled at all.
+ if (!config.deleteTopicEnable) {
+ if (apiVersion < 3) {
+ throw new InvalidRequestException("Topic deletion is disabled.")
+ } else {
+ throw new TopicDeletionDisabledException()
+ }
+ }
+ // The first step is to load up the names and IDs that have been provided by the
+ // request. This is a bit messy because we support multiple ways of referring to
+ // topics (both by name and by id) and because we need to check for duplicates or
+ // other invalid inputs.
+ val responses = new util.ArrayList[DeletableTopicResult]
+ def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+ responses.add(new DeletableTopicResult().
+ setName(name).
+ setTopicId(id).
+ setErrorCode(error.error.code).
+ setErrorMessage(error.message))
+ }
+ val providedNames = new util.HashSet[String]
+ val duplicateProvidedNames = new util.HashSet[String]
+ val providedIds = new util.HashSet[Uuid]
+ val duplicateProvidedIds = new util.HashSet[Uuid]
+ def addProvidedName(name: String): Unit = {
+ if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+ duplicateProvidedNames.add(name)
+ providedNames.remove(name)
+ }
+ }
+ request.topicNames.forEach(addProvidedName)
+ request.topics.forEach {
+ topic => if (topic.name == null) {
+ if (topic.topicId.equals(ZERO_UUID)) {
+ appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+ "Neither topic name nor id were specified."))
+ } else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) {
+ duplicateProvidedIds.add(topic.topicId)
+ providedIds.remove(topic.topicId)
+ }
+ } else {
+ if (topic.topicId.equals(ZERO_UUID)) {
+ addProvidedName(topic.name)
+ } else {
+ appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST,
+ "You may not specify both topic name and topic id."))
+ }
+ }
+ }
+ // Create error responses for duplicates.
+ duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+ new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+ duplicateProvidedIds.forEach(id => appendResponse(null, id,
+ new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+ // At this point we have all the valid names and IDs that have been provided.
+ // However, the Authorizer needs topic names as inputs, not topic IDs. So
+ // we need to resolve all IDs to names.
+ val toAuthenticate = new util.HashSet[String]
+ toAuthenticate.addAll(providedNames)
+ val idToName = new util.HashMap[Uuid, String]
+ controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+ if (nameOrError.isError) {
+ appendResponse(null, id, nameOrError.error())
+ } else {
+ toAuthenticate.add(nameOrError.result())
+ idToName.put(id, nameOrError.result())
+ }
+ }
+ // Get the list of deletable topics (those we can delete) and the list of describeable
+ // topics. If a topic can't be deleted or described, we have to act like it doesn't
+ // exist, even when it does.
+ val topicsToAuthenticate = toAuthenticate.asScala
+ val (describeable, deletable) = if (hasClusterAuth) {
+ (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+ } else {
+ (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
+ }
+ // For each topic that was provided by ID, check if authentication failed.
+ // If so, remove it from the idToName map and create an error response for it.
+ val iterator = idToName.entrySet().iterator()
+ while (iterator.hasNext) {
+ val entry = iterator.next()
+ val id = entry.getKey
+ val name = entry.getValue
+ if (!deletable.contains(name)) {
+ if (describeable.contains(name)) {
+ appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+ } else {
+ appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+ }
+ iterator.remove()
+ }
+ }
+ // For each topic that was provided by name, check if authentication failed.
+ // If so, create an error response for it. Otherwise, add it to the idToName map.
+ controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
+ if (!describeable.contains(name)) {
+ appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+ } else if (idOrError.isError) {
+ appendResponse(name, ZERO_UUID, idOrError.error)
+ } else if (deletable.contains(name)) {
+ val id = idOrError.result()
+ if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
+ // This is kind of a weird case: what if we supply topic ID X and also a name
+ // that maps to ID X? In that case, _if authorization succeeds_, we end up
+ // here. If authorization doesn't succeed, we refrain from commenting on the
+ // situation since it would reveal topic ID mappings.
+ duplicateProvidedIds.add(id)
+ idToName.remove(id)
+ appendResponse(name, id, new ApiError(INVALID_REQUEST,
+ "The provided topic name maps to an ID that was already supplied."))
+ }
+ } else {
+ appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+ }
+ }
+ // Finally, the idToName map contains all the topics that we are authorized to delete.
+ // Perform the deletion and create responses for each one.
+ val idToError = controller.deleteTopics(idToName.keySet).get()
+ idToError.forEach { (id, error) =>
+ appendResponse(idToName.get(id), id, error)
+ }
+ // Shuffle the responses so that users can not use patterns in their positions to
+ // distinguish between absent topics and topics we are not permitted to see.
+ Collections.shuffle(responses)
+ responses
+ }
+
def handleCreateTopics(request: RequestChannel.Request): Unit = {
val createTopicRequest = request.body[CreateTopicsRequest]
val (authorizedCreateRequest, unauthorizedTopics) =
if (authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME)) {
(createTopicRequest.data, Seq.empty)
} else {
- val duplicate = createTopicRequest.data.duplicate()
+ val duplicate = createTopicRequest.data.duplicate
val authorizedTopics = new CreatableTopicCollection()
val unauthorizedTopics = mutable.Buffer.empty[String]
- createTopicRequest.data.topics.asScala.foreach { topicData =>
+ createTopicRequest.data.topics.forEach { topicData =>
if (authHelper.authorize(request.context, CREATE, TOPIC, topicData.name)) {
authorizedTopics.add(topicData)
} else {
@@ -177,7 +332,7 @@ class ControllerApis(val requestChannel: RequestChannel,
unauthorizedTopics.foreach { topic =>
val result = new CreatableTopicResult()
.setName(topic)
- .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
response.topics.add(result)
}
@@ -214,7 +369,7 @@ class ControllerApis(val requestChannel: RequestChannel,
if (apiVersionRequest.hasUnsupportedRequestVersion) {
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
} else if (!apiVersionRequest.isValid) {
- apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
+ apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)
} else {
apiVersionManager.apiVersionResponse(requestThrottleMs)
}
@@ -245,7 +400,7 @@ class ControllerApis(val requestChannel: RequestChannel,
def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val alterIsrRequest = request.body[AlterIsrRequest]
- val future = controller.alterIsr(alterIsrRequest.data())
+ val future = controller.alterIsr(alterIsrRequest.data)
future.whenComplete((result, exception) => {
val response = if (exception != null) {
alterIsrRequest.getErrorResponse(exception)
@@ -267,14 +422,14 @@ class ControllerApis(val requestChannel: RequestChannel,
if (e != null) {
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
setThrottleTimeMs(requestThrottleMs).
- setErrorCode(Errors.forException(e).code()))
+ setErrorCode(Errors.forException(e).code))
} else {
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
setThrottleTimeMs(requestThrottleMs).
- setErrorCode(Errors.NONE.code()).
- setIsCaughtUp(reply.isCaughtUp()).
- setIsFenced(reply.isFenced()).
- setShouldShutDown(reply.shouldShutDown()))
+ setErrorCode(Errors.NONE.code).
+ setIsCaughtUp(reply.isCaughtUp).
+ setIsFenced(reply.isFenced).
+ setShouldShutDown(reply.shouldShutDown))
}
}
requestHelper.sendResponseMaybeThrottle(request,
@@ -292,7 +447,7 @@ class ControllerApis(val requestChannel: RequestChannel,
if (e != null) {
new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
setThrottleTimeMs(requestThrottleMs).
- setErrorCode(Errors.forException(e).code()))
+ setErrorCode(Errors.forException(e).code))
} else {
new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
setThrottleTimeMs(requestThrottleMs))
@@ -318,7 +473,7 @@ class ControllerApis(val requestChannel: RequestChannel,
} else {
new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
setThrottleTimeMs(requestThrottleMs).
- setErrorCode(Errors.NONE.code()).
+ setErrorCode(Errors.NONE.code).
setBrokerEpoch(reply.epoch))
}
}
@@ -346,7 +501,7 @@ class ControllerApis(val requestChannel: RequestChannel,
val quotaRequest = request.body[AlterClientQuotasRequest]
authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
- controller.alterClientQuotas(quotaRequest.entries(), quotaRequest.validateOnly())
+ controller.alterClientQuotas(quotaRequest.entries, quotaRequest.validateOnly)
.whenComplete((results, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
@@ -362,15 +517,15 @@ class ControllerApis(val requestChannel: RequestChannel,
authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]()
alterConfigsRequest.data.resources.forEach { resource =>
- val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType()), resource.resourceName())
+ val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
- altersByName.put(config.name(), new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
- AlterConfigOp.OpType.forId(config.configOperation()), config.value()))
+ altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
+ AlterConfigOp.OpType.forId(config.configOperation), config.value))
}
configChanges.put(configResource, altersByName)
}
- controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data().validateOnly())
+ controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data.validateOnly)
.whenComplete((results, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
new file mode 100644
index 0000000..d38cab5
--- /dev/null
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.ResultOrError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public class MockController implements Controller {
+ private final static NotControllerException NOT_CONTROLLER_EXCEPTION =
+ new NotControllerException("This is not the correct controller for this cluster.");
+
+ public static class Builder {
+ private final Map<String, MockTopic> initialTopics = new HashMap<>();
+
+ public Builder newInitialTopic(String name, Uuid id) {
+ initialTopics.put(name, new MockTopic(name, id));
+ return this;
+ }
+
+ public MockController build() {
+ return new MockController(initialTopics.values());
+ }
+ }
+
+ private volatile boolean active = true;
+
+ private MockController(Collection<MockTopic> initialTopics) {
+ for (MockTopic topic : initialTopics) {
+ topics.put(topic.id, topic);
+ topicNameToId.put(topic.name, topic.id);
+ }
+ }
+
+ @Override
+ public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> unregisterBroker(int brokerId) {
+ throw new UnsupportedOperationException();
+ }
+
+ static class MockTopic {
+ private final String name;
+ private final Uuid id;
+
+ MockTopic(String name, Uuid id) {
+ this.name = name;
+ this.id = id;
+ }
+ }
+
+ private final Map<String, Uuid> topicNameToId = new HashMap<>();
+
+ private final Map<Uuid, MockTopic> topics = new HashMap<>();
+
+ @Override
+ synchronized public CompletableFuture<Map<String, ResultOrError<Uuid>>>
+ findTopicIds(Collection<String> topicNames) {
+ Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+ for (String topicName : topicNames) {
+ if (!topicNameToId.containsKey(topicName)) {
+ results.put(topicName, new ResultOrError<>(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+ } else {
+ results.put(topicName, new ResultOrError<>(topicNameToId.get(topicName)));
+ }
+ }
+ return CompletableFuture.completedFuture(results);
+ }
+
+ @Override
+ synchronized public CompletableFuture<Map<Uuid, ResultOrError<String>>>
+ findTopicNames(Collection<Uuid> topicIds) {
+ Map<Uuid, ResultOrError<String>> results = new HashMap<>();
+ for (Uuid topicId : topicIds) {
+ MockTopic topic = topics.get(topicId);
+ if (topic == null) {
+ results.put(topicId, new ResultOrError<>(new ApiError(Errors.UNKNOWN_TOPIC_ID)));
+ } else {
+ results.put(topicId, new ResultOrError<>(topic.name));
+ }
+ }
+ return CompletableFuture.completedFuture(results);
+ }
+
+ @Override
+ synchronized public CompletableFuture<Map<Uuid, ApiError>>
+ deleteTopics(Collection<Uuid> topicIds) {
+ if (!active) {
+ CompletableFuture<Map<Uuid, ApiError>> future = new CompletableFuture<>();
+ future.completeExceptionally(NOT_CONTROLLER_EXCEPTION);
+ return future;
+ }
+ Map<Uuid, ApiError> results = new HashMap<>();
+ for (Uuid topicId : topicIds) {
+ MockTopic topic = topics.remove(topicId);
+ if (topic == null) {
+ results.put(topicId, new ApiError(Errors.UNKNOWN_TOPIC_ID));
+ } else {
+ topicNameToId.remove(topic.name);
+ results.put(topicId, ApiError.NONE);
+ }
+ }
+ return CompletableFuture.completedFuture(results);
+ }
+
+ @Override
+ public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(Map<ConfigResource, Collection<String>> resources) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+ Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
+ boolean validateOnly) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+ Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<BrokerHeartbeatReply>
+ processBrokerHeartbeat(BrokerHeartbeatRequestData request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<BrokerRegistrationReply>
+ registerBroker(BrokerRegistrationRequestData request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Map<ClientQuotaEntity, ApiError>>
+ alterClientQuotas(Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void beginShutdown() {
+ this.active = false;
+ }
+
+ public void setActive(boolean active) {
+ this.active = active;
+ }
+
+ @Override
+ public long curClaimEpoch() {
+ return active ? 1 : -1;
+ }
+
+ @Override
+ public void close() {
+ beginShutdown();
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 2f1c91e..8e400ed 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -15,22 +15,28 @@
* limitations under the License.
*/
-package unit.kafka.server
+package kafka.server
import java.net.InetAddress
+import java.util
import java.util.Properties
+import java.util.concurrent.ExecutionException
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager, SimpleApiVersionManager}
+import kafka.test.MockController
import kafka.utils.MockTime
import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.Uuid.ZERO_UUID
+import org.apache.kafka.common.errors.{InvalidRequestException, NotControllerException, TopicDeletionDisabledException}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import org.apache.kafka.common.message.BrokerRegistrationRequestData
+import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
+import org.apache.kafka.common.message.{BrokerRegistrationRequestData, DeleteTopicsRequestData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerRegistrationRequest, BrokerRegistrationResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.controller.Controller
@@ -56,7 +62,6 @@ class ControllerApisTest {
private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
private val replicaQuotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
private val raftManager: RaftManager[ApiMessageAndVersion] = mock(classOf[RaftManager[ApiMessageAndVersion]])
- private val authorizer: Authorizer = mock(classOf[Authorizer])
private val quotas = QuotaManagers(
clientQuotaManager,
@@ -67,15 +72,15 @@ class ControllerApisTest {
replicaQuotaManager,
replicaQuotaManager,
None)
- private val controller: Controller = mock(classOf[Controller])
- private def createControllerApis(): ControllerApis = {
- val props = new Properties()
+ private def createControllerApis(authorizer: Option[Authorizer],
+ controller: Controller,
+ props: Properties = new Properties()): ControllerApis = {
props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
props.put(KafkaConfig.ProcessRolesProp, "controller")
new ControllerApis(
requestChannel,
- Some(authorizer),
+ authorizer,
quotas,
time,
Map.empty,
@@ -122,6 +127,7 @@ class ControllerApisTest {
val request = buildRequest(brokerRegistrationRequest)
val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
+ val authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(
any(classOf[AuthorizableRequestContext]),
any(classOf[java.util.List[Action]])
@@ -129,7 +135,7 @@ class ControllerApisTest {
java.util.Collections.singletonList(AuthorizationResult.DENIED)
)
- createControllerApis().handle(request)
+ createControllerApis(Some(authorizer), mock(classOf[Controller])).handle(request)
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
@@ -142,6 +148,197 @@ class ControllerApisTest {
brokerRegistrationResponse.errorCounts().asScala)
}
+ @Test
+ def testDeleteTopicsByName(): Unit = {
+ val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+ val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+ val controllerApis = createControllerApis(None, controller)
+ val request = new DeleteTopicsRequestData().setTopicNames(
+ util.Arrays.asList("foo", "bar", "quux", "quux"))
+ val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic name."),
+ new DeletableTopicResult().setName("bar").
+ setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
+ setErrorMessage("This server does not host this topic-partition."),
+ new DeletableTopicResult().setName("foo").setTopicId(fooId))
+ assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ true,
+ _ => Set.empty,
+ _ => Set.empty).asScala.toSet)
+ }
+
+ @Test
+ def testDeleteTopicsById(): Unit = {
+ val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+ val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+ val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
+ val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+ val controllerApis = createControllerApis(None, controller)
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+ val response = Set(new DeletableTopicResult().setName(null).setTopicId(quuxId).
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic id."),
+ new DeletableTopicResult().setName(null).setTopicId(barId).
+ setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()).
+ setErrorMessage("This server does not host this topic ID."),
+ new DeletableTopicResult().setName("foo").setTopicId(fooId))
+ assertEquals(response, controllerApis.deleteTopics(request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ true,
+ _ => Set.empty,
+ _ => Set.empty).asScala.toSet)
+ }
+
+ @Test
+ def testInvalidDeleteTopicsRequest(): Unit = {
+ val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+ val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+ val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA")
+ val controller = new MockController.Builder().
+ newInitialTopic("foo", fooId).
+ newInitialTopic("bar", barId).build()
+ val controllerApis = createControllerApis(None, controller)
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName("foo").setTopicId(fooId))
+ request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+ request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+ val response = Set(new DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("Neither topic name nor id were specified."),
+ new DeletableTopicResult().setName("foo").setTopicId(fooId).
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("You may not specify both topic name and topic id."),
+ new DeletableTopicResult().setName("bar").setTopicId(barId).
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("The provided topic name maps to an ID that was already supplied."),
+ new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic name."),
+ new DeletableTopicResult().setName(null).setTopicId(bazId).
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic id."))
+ assertEquals(response, controllerApis.deleteTopics(request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ names => names.toSet,
+ names => names.toSet).asScala.toSet)
+ }
+
+ @Test
+ def testNotAuthorizedToDeleteWithTopicExisting(): Unit = {
+ val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+ val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+ val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")
+ val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg")
+ val controller = new MockController.Builder().
+ newInitialTopic("foo", fooId).
+ newInitialTopic("bar", barId).
+ newInitialTopic("baz", bazId).
+ newInitialTopic("quux", quuxId).build()
+ val controllerApis = createControllerApis(None, controller)
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+ request.topics().add(new DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ val response = Set(new DeletableTopicResult().setName(null).setTopicId(barId).
+ setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+ new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+ setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+ new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+ setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+ new DeletableTopicResult().setName("foo").setTopicId(fooId).
+ setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+ assertEquals(response, controllerApis.deleteTopics(request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo", "baz"),
+ _ => Set.empty).asScala.toSet)
+ }
+
+ @Test
+ def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = {
+ val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+ val controller = new MockController.Builder().build()
+ val controllerApis = createControllerApis(None, controller)
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+ val expectedResponse = Set(new DeletableTopicResult().setName("foo").
+ setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code).
+ setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message),
+ new DeletableTopicResult().setName("bar").
+ setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+ new DeletableTopicResult().setName(null).setTopicId(barId).
+ setErrorCode(Errors.UNKNOWN_TOPIC_ID.code).
+ setErrorMessage(Errors.UNKNOWN_TOPIC_ID.message))
+ assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo"),
+ _ => Set.empty).asScala.toSet)
+ }
+
+ @Test
+ def testNotControllerErrorPreventsDeletingTopics(): Unit = {
+ val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+ val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+ val controller = new MockController.Builder().
+ newInitialTopic("foo", fooId).build()
+ controller.setActive(false)
+ val controllerApis = createControllerApis(None, controller)
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+ request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+ assertEquals(classOf[NotControllerException], assertThrows(
+ classOf[ExecutionException], () => controllerApis.deleteTopics(request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo", "bar"),
+ _ => Set("foo", "bar"))).getCause.getClass)
+ }
+
+ @Test
+ def testDeleteTopicsDisabled(): Unit = {
+ val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+ val controller = new MockController.Builder().
+ newInitialTopic("foo", fooId).build()
+ val props = new Properties()
+ props.put(KafkaConfig.DeleteTopicEnableProp, "false")
+ val controllerApis = createControllerApis(None, controller, props)
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+ assertThrows(classOf[TopicDeletionDisabledException], () => controllerApis.deleteTopics(request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo", "bar"),
+ _ => Set("foo", "bar")))
+ assertThrows(classOf[InvalidRequestException], () => controllerApis.deleteTopics(request,
+ 1,
+ false,
+ _ => Set("foo", "bar"),
+ _ => Set("foo", "bar")))
+ }
+
@AfterEach
def tearDown(): Unit = {
quotas.shutdown()
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index 46148e7..9d54c20 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -214,6 +214,13 @@ public class BrokersToIsrs {
}
}
+ void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
+ Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
+ if (topicMap != null) {
+ topicMap.remove(topicId);
+ }
+ }
+
private void add(int brokerId, Uuid topicId, int newPartition, boolean leader) {
if (leader) {
newPartition = newPartition | LEADER_FLAG;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 5bc82ec..dcfe92d46 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -303,7 +303,7 @@ public class ConfigurationControlManager {
*
* @param record The ConfigRecord.
*/
- void replay(ConfigRecord record) {
+ public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type, record.resourceName());
TimelineHashMap<String, String> configs = configData.get(configResource);
@@ -364,4 +364,8 @@ public class ConfigurationControlManager {
}
return results;
}
+
+ void deleteTopicConfigs(String name) {
+ configData.remove(new ConfigResource(Type.TOPIC, name));
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 1ce63e0..2639463 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -18,6 +18,7 @@
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
@@ -70,6 +71,31 @@ public interface Controller extends AutoCloseable {
CompletableFuture<Void> unregisterBroker(int brokerId);
/**
+ * Find the ids for topic names.
+ *
+ * @param topicNames The topic names to resolve.
+ * @return A future yielding a map from topic name to id.
+ */
+ CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> topicNames);
+
+ /**
+ * Find the names for topic ids.
+ *
+ * @param topicIds The topic ids to resolve.
+ * @return A future yielding a map from topic id to name.
+ */
+ CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(Collection<Uuid> topicIds);
+
+ /**
+ * Delete a batch of topics.
+ *
+ * @param topicIds The IDs of the topics to delete.
+ *
+ * @return A future yielding the response.
+ */
+ CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> topicIds);
+
+ /**
* Describe the current configuration of various resources.
*
* @param resources A map from resources to the collection of config keys that we
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 759db1e..6ee1b7e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
@@ -44,12 +45,14 @@ import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.QuotaRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
@@ -448,7 +451,7 @@ public final class QuorumController implements Controller {
writeOffset = offset;
resultAndOffset = ControllerResultAndOffset.of(offset, result);
for (ApiMessageAndVersion message : result.records()) {
- replay(message.message());
+ replay(message.message(), offset);
}
snapshotRegistry.createSnapshot(offset);
log.debug("Read-write operation {} will be completed when the log " +
@@ -513,7 +516,7 @@ public final class QuorumController implements Controller {
}
}
for (ApiMessage message : messages) {
- replay(message);
+ replay(message, offset);
}
} else {
// If the controller is active, the records were already replayed,
@@ -623,7 +626,7 @@ public final class QuorumController implements Controller {
}
@SuppressWarnings("unchecked")
- private void replay(ApiMessage message) {
+ private void replay(ApiMessage message, long offset) {
try {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
@@ -633,12 +636,6 @@ public final class QuorumController implements Controller {
case UNREGISTER_BROKER_RECORD:
clusterControl.replay((UnregisterBrokerRecord) message);
break;
- case FENCE_BROKER_RECORD:
- clusterControl.replay((FenceBrokerRecord) message);
- break;
- case UNFENCE_BROKER_RECORD:
- clusterControl.replay((UnfenceBrokerRecord) message);
- break;
case TOPIC_RECORD:
replicationControl.replay((TopicRecord) message);
break;
@@ -648,12 +645,24 @@ public final class QuorumController implements Controller {
case CONFIG_RECORD:
configurationControl.replay((ConfigRecord) message);
break;
- case QUOTA_RECORD:
- clientQuotaControlManager.replay((QuotaRecord) message);
- break;
case PARTITION_CHANGE_RECORD:
replicationControl.replay((PartitionChangeRecord) message);
break;
+ case FENCE_BROKER_RECORD:
+ clusterControl.replay((FenceBrokerRecord) message);
+ break;
+ case UNFENCE_BROKER_RECORD:
+ clusterControl.replay((UnfenceBrokerRecord) message);
+ break;
+ case REMOVE_TOPIC_RECORD:
+ replicationControl.replay((RemoveTopicRecord) message);
+ break;
+ case FEATURE_LEVEL_RECORD:
+ featureControl.replay((FeatureLevelRecord) message, offset);
+ break;
+ case QUOTA_RECORD:
+ clientQuotaControlManager.replay((QuotaRecord) message);
+ break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
@@ -793,6 +802,9 @@ public final class QuorumController implements Controller {
@Override
public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
+ if (request.topics().isEmpty()) {
+ return CompletableFuture.completedFuture(new AlterIsrResponseData());
+ }
return appendWriteEvent("alterIsr", () ->
replicationControl.alterIsr(request));
}
@@ -800,6 +812,9 @@ public final class QuorumController implements Controller {
@Override
public CompletableFuture<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request) {
+ if (request.topics().isEmpty()) {
+ return CompletableFuture.completedFuture(new CreateTopicsResponseData());
+ }
return appendWriteEvent("createTopics", () ->
replicationControl.createTopics(request));
}
@@ -811,6 +826,27 @@ public final class QuorumController implements Controller {
}
@Override
+ public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> names) {
+ if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+ return appendReadEvent("findTopicIds",
+ () -> replicationControl.findTopicIds(lastCommittedOffset, names));
+ }
+
+ @Override
+ public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(Collection<Uuid> ids) {
+ if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+ return appendReadEvent("findTopicNames",
+ () -> replicationControl.findTopicNames(lastCommittedOffset, ids));
+ }
+
+ @Override
+ public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+ if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+ return appendWriteEvent("deleteTopics",
+ () -> replicationControl.deleteTopics(ids));
+ }
+
+ @Override
public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
describeConfigs(Map<ConfigResource, Collection<String>> resources) {
return appendReadEvent("describeConfigs", () ->
@@ -847,7 +883,10 @@ public final class QuorumController implements Controller {
@Override
public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
- Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+ Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+ if (newConfigs.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyMap());
+ }
return appendWriteEvent("legacyAlterConfigs", () -> {
ControllerResult<Map<ConfigResource, ApiError>> result =
configurationControl.legacyAlterConfigs(newConfigs);
@@ -901,6 +940,9 @@ public final class QuorumController implements Controller {
@Override
public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
+ if (quotaAlterations.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyMap());
+ }
return appendWriteEvent("alterClientQuotas", () -> {
ControllerResult<Map<ClientQuotaEntity, ApiError>> result =
clientQuotaControlManager.alterClientQuotas(quotaAlterations);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index ca57105..59798c4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -21,28 +21,31 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData;
-import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
-import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
@@ -60,6 +63,7 @@ import org.slf4j.Logger;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,6 +76,9 @@ import java.util.Random;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
/**
@@ -92,10 +99,12 @@ public class ReplicationControlManager {
public static final int NO_LEADER_CHANGE = -2;
static class TopicControlInfo {
+ private final String name;
private final Uuid id;
private final TimelineHashMap<Integer, PartitionControlInfo> parts;
- TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+ TopicControlInfo(String name, SnapshotRegistry snapshotRegistry, Uuid id) {
+ this.name = name;
this.id = id;
this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
}
@@ -261,7 +270,7 @@ public class ReplicationControlManager {
/**
* A reference to the controller's cluster control manager.
*/
- final ClusterControlManager clusterControl;
+ private final ClusterControlManager clusterControl;
/**
* Maps topic names to topic UUIDs.
@@ -299,7 +308,8 @@ public class ReplicationControlManager {
public void replay(TopicRecord record) {
topicsByName.put(record.name(), record.topicId());
- topics.put(record.topicId(), new TopicControlInfo(snapshotRegistry, record.topicId()));
+ topics.put(record.topicId(),
+ new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
log.info("Created topic {} with ID {}.", record.name(), record.topicId());
}
@@ -349,6 +359,29 @@ public class ReplicationControlManager {
log.debug("Applied ISR change record: {}", record.toString());
}
+ public void replay(RemoveTopicRecord record) {
+ // Remove this topic from the topics map and the topicsByName map.
+ TopicControlInfo topic = topics.remove(record.topicId());
+ if (topic == null) {
+ throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() +
+ " to remove.");
+ }
+ topicsByName.remove(topic.name);
+
+ // Delete the configurations associated with this topic.
+ configurationControl.deleteTopicConfigs(topic.name);
+
+ // Remove the entries for this topic in brokersToIsrs.
+ for (PartitionControlInfo partition : topic.parts.values()) {
+ for (int i = 0; i < partition.isr.length; i++) {
+ brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
+ }
+ }
+ brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
+
+ log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
+ }
+
ControllerResult<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request) {
Map<String, ApiError> topicErrors = new HashMap<>();
@@ -416,12 +449,12 @@ public class ReplicationControlManager {
Map<Integer, PartitionControlInfo> newParts = new HashMap<>();
if (!topic.assignments().isEmpty()) {
if (topic.replicationFactor() != -1) {
- return new ApiError(Errors.INVALID_REQUEST,
+ return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but replication " +
"factor was not set to -1.");
}
if (topic.numPartitions() != -1) {
- return new ApiError(Errors.INVALID_REQUEST,
+ return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
@@ -458,7 +491,7 @@ public class ReplicationControlManager {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Replication factor was set to an invalid non-positive value.");
} else if (!topic.assignments().isEmpty()) {
- return new ApiError(Errors.INVALID_REQUEST,
+ return new ApiError(INVALID_REQUEST,
"Replication factor was not set to -1 but a manual partition " +
"assignment was specified.");
} else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
@@ -541,6 +574,68 @@ public class ReplicationControlManager {
return configChanges;
}
+ Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+ Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size());
+ for (String name : names) {
+ if (name == null) {
+ results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
+ } else {
+ Uuid id = topicsByName.get(name, offset);
+ if (id == null) {
+ results.put(name, new ResultOrError<>(
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+ } else {
+ results.put(name, new ResultOrError<>(id));
+ }
+ }
+ }
+ return results;
+ }
+
+ Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
+ Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size());
+ for (Uuid id : ids) {
+ if (id == null || id.equals(Uuid.ZERO_UUID)) {
+ results.put(id, new ResultOrError<>(new ApiError(INVALID_REQUEST,
+ "Attempt to find topic with invalid topicId " + id)));
+ } else {
+ TopicControlInfo topic = topics.get(id, offset);
+ if (topic == null) {
+ results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID)));
+ } else {
+ results.put(id, new ResultOrError<>(topic.name));
+ }
+ }
+ }
+ return results;
+ }
+
+ ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+ Map<Uuid, ApiError> results = new HashMap<>(ids.size());
+ List<ApiMessageAndVersion> records = new ArrayList<>(ids.size());
+ for (Uuid id : ids) {
+ try {
+ deleteTopic(id, records);
+ results.put(id, ApiError.NONE);
+ } catch (ApiException e) {
+ results.put(id, ApiError.fromThrowable(e));
+ } catch (Exception e) {
+ log.error("Unexpected deleteTopics error for {}", id, e);
+ results.put(id, ApiError.fromThrowable(e));
+ }
+ }
+ return new ControllerResult<>(records, results);
+ }
+
+ void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+ TopicControlInfo topic = topics.get(id);
+ if (topic == null) {
+ throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
+ }
+ records.add(new ApiMessageAndVersion(new RemoveTopicRecord().
+ setTopicId(id), (short) 0));
+ }
+
// VisibleForTesting
PartitionControlInfo getPartition(Uuid topicId, int partitionId) {
TopicControlInfo topic = topics.get(topicId);
@@ -568,7 +663,7 @@ public class ReplicationControlManager {
for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+ setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
}
continue;
}
@@ -578,13 +673,13 @@ public class ReplicationControlManager {
if (partition == null) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+ setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
continue;
}
if (request.brokerId() != partition.leader) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.INVALID_REQUEST.code()));
+ setErrorCode(INVALID_REQUEST.code()));
continue;
}
if (partitionData.leaderEpoch() != partition.leaderEpoch) {
@@ -603,14 +698,14 @@ public class ReplicationControlManager {
if (!Replicas.validateIsr(partition.replicas, newIsr)) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.INVALID_REQUEST.code()));
+ setErrorCode(INVALID_REQUEST.code()));
continue;
}
if (!Replicas.contains(newIsr, partition.leader)) {
// An alterIsr request can't remove the current leader.
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.INVALID_REQUEST.code()));
+ setErrorCode(INVALID_REQUEST.code()));
continue;
}
records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
@@ -797,17 +892,17 @@ public class ReplicationControlManager {
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic);
if (topicId == null) {
- return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such topic as " + topic);
}
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) {
- return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such topic id as " + topicId);
}
PartitionControlInfo partitionInfo = topicInfo.parts.get(partitionId);
if (partitionInfo == null) {
- return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, unclean);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
index 6d910e4..2fedacd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.requests.ApiError;
import java.util.Objects;
-class ResultOrError<T> {
+public class ResultOrError<T> {
private final ApiError error;
private final T result;
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index a8ea94d..2f5d7be 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -348,7 +348,7 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
return prev;
}
- Object snapshottableRemove(Object object) {
+ T snapshottableRemove(Object object) {
T prev = baseRemove(object);
if (prev == null) {
return null;
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
index bbec158..6e02517 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
@@ -164,9 +164,10 @@ public class TimelineHashMap<K, V>
}
@Override
- @SuppressWarnings("unchecked")
public V remove(Object key) {
- return (V) snapshottableRemove(new TimelineHashMapEntry<>(key, null));
+ TimelineHashMapEntry<K, V> result = snapshottableRemove(
+ new TimelineHashMapEntry<>(key, null));
+ return result == null ? null : result.value;
}
@Override
@@ -342,7 +343,7 @@ public class TimelineHashMap<K, V>
if (epoch != SnapshottableHashTable.LATEST_EPOCH) {
throw new RuntimeException("can't modify snapshot");
}
- return snapshottableRemove(o) != null;
+ return snapshottableRemove(new TimelineHashMapEntry<>(o, null)) != null;
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index a639614..da6e4af 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
@@ -55,37 +57,50 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
import static org.apache.kafka.controller.ReplicationControlManager.PartitionControlInfo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(40)
public class ReplicationControlManagerTest {
- private static ReplicationControlManager newReplicationControlManager() {
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
- LogContext logContext = new LogContext();
- MockTime time = new MockTime();
- MockRandom random = new MockRandom();
- ClusterControlManager clusterControl = new ClusterControlManager(
+ private static class ReplicationControlTestContext {
+ final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+ final LogContext logContext = new LogContext();
+ final MockTime time = new MockTime();
+ final MockRandom random = new MockRandom();
+ final ClusterControlManager clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, 1000,
new SimpleReplicaPlacementPolicy(random));
- clusterControl.activate();
- ConfigurationControlManager configurationControl = new ConfigurationControlManager(
+ final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap());
- return new ReplicationControlManager(snapshotRegistry,
+ final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry,
new LogContext(),
random,
(short) 3,
1,
configurationControl,
clusterControl);
+
+ void replay(List<ApiMessageAndVersion> records) throws Exception {
+ ControllerTestUtils.replayAll(clusterControl, records);
+ ControllerTestUtils.replayAll(configurationControl, records);
+ ControllerTestUtils.replayAll(replicationControl, records);
+ }
+
+ ReplicationControlTestContext() {
+ clusterControl.activate();
+ }
}
- private static void registerBroker(int brokerId, ReplicationControlManager replicationControl) {
+ private static void registerBroker(int brokerId, ReplicationControlTestContext ctx) {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
@@ -93,22 +108,23 @@ public class ReplicationControlManagerTest {
setPort((short) 9092 + brokerId).
setName("PLAINTEXT").
setHost("localhost"));
- replicationControl.clusterControl.replay(brokerRecord);
+ ctx.clusterControl.replay(brokerRecord);
}
private static void unfenceBroker(int brokerId,
- ReplicationControlManager replicationControl) throws Exception {
- ControllerResult<BrokerHeartbeatReply> result = replicationControl.
+ ReplicationControlTestContext ctx) throws Exception {
+ ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).setCurrentMetadataOffset(1).
setWantFence(false).setWantShutDown(false), 0);
assertEquals(new BrokerHeartbeatReply(true, false, false, false), result.response());
- ControllerTestUtils.replayAll(replicationControl.clusterControl, result.records());
+ ControllerTestUtils.replayAll(ctx.clusterControl, result.records());
}
@Test
public void testCreateTopics() throws Exception {
- ReplicationControlManager replicationControl = newReplicationControlManager();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) -1));
@@ -120,12 +136,12 @@ public class ReplicationControlManagerTest {
setErrorMessage("Unable to replicate the partition 3 times: there are only 0 usable brokers"));
assertEquals(expectedResponse, result.response());
- registerBroker(0, replicationControl);
- unfenceBroker(0, replicationControl);
- registerBroker(1, replicationControl);
- unfenceBroker(1, replicationControl);
- registerBroker(2, replicationControl);
- unfenceBroker(2, replicationControl);
+ registerBroker(0, ctx);
+ unfenceBroker(0, ctx);
+ registerBroker(1, ctx);
+ unfenceBroker(1, ctx);
+ registerBroker(2, ctx);
+ unfenceBroker(2, ctx);
ControllerResult<CreateTopicsResponseData> result2 =
replicationControl.createTopics(request);
CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
@@ -189,10 +205,11 @@ public class ReplicationControlManagerTest {
@Test
public void testRemoveLeaderships() throws Exception {
- ReplicationControlManager replicationControl = newReplicationControlManager();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
for (int i = 0; i < 6; i++) {
- registerBroker(i, replicationControl);
- unfenceBroker(i, replicationControl);
+ registerBroker(i, ctx);
+ unfenceBroker(i, ctx);
}
CreatableTopicResult result = createTestTopic(replicationControl, "foo",
new int[][] {
@@ -215,10 +232,11 @@ public class ReplicationControlManagerTest {
@Test
public void testShrinkAndExpandIsr() throws Exception {
- ReplicationControlManager replicationControl = newReplicationControlManager();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
for (int i = 0; i < 3; i++) {
- registerBroker(i, replicationControl);
- unfenceBroker(i, replicationControl);
+ registerBroker(i, ctx);
+ unfenceBroker(i, ctx);
}
CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
new int[][] {new int[] {0, 1, 2}});
@@ -226,13 +244,13 @@ public class ReplicationControlManagerTest {
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
TopicPartition topicPartition = new TopicPartition("foo", 0);
assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
- long brokerEpoch = currentBrokerEpoch(replicationControl, 0);
+ long brokerEpoch = currentBrokerEpoch(ctx, 0);
AlterIsrRequestData.PartitionData shrinkIsrRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
ControllerResult<AlterIsrResponseData> shrinkIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
AlterIsrResponseData.PartitionData shrinkIsrResponse = assertAlterIsrResponse(
- shrinkIsrResult, topicPartition, Errors.NONE);
+ shrinkIsrResult, topicPartition, NONE);
assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
AlterIsrRequestData.PartitionData expandIsrRequest = newAlterIsrPartition(
@@ -240,16 +258,17 @@ public class ReplicationControlManagerTest {
ControllerResult<AlterIsrResponseData> expandIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
AlterIsrResponseData.PartitionData expandIsrResponse = assertAlterIsrResponse(
- expandIsrResult, topicPartition, Errors.NONE);
+ expandIsrResult, topicPartition, NONE);
assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, expandIsrResponse);
}
@Test
public void testInvalidAlterIsrRequests() throws Exception {
- ReplicationControlManager replicationControl = newReplicationControlManager();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
for (int i = 0; i < 3; i++) {
- registerBroker(i, replicationControl);
- unfenceBroker(i, replicationControl);
+ registerBroker(i, ctx);
+ unfenceBroker(i, ctx);
}
CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
new int[][] {new int[] {0, 1, 2}});
@@ -257,13 +276,13 @@ public class ReplicationControlManagerTest {
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
TopicPartition topicPartition = new TopicPartition("foo", 0);
assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
- long brokerEpoch = currentBrokerEpoch(replicationControl, 0);
+ long brokerEpoch = currentBrokerEpoch(ctx, 0);
// Invalid leader
AlterIsrRequestData.PartitionData invalidLeaderRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
ControllerResult<AlterIsrResponseData> invalidLeaderResult = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+ replicationControl, 1, currentBrokerEpoch(ctx, 1),
"foo", invalidLeaderRequest);
assertAlterIsrResponse(invalidLeaderResult, topicPartition, Errors.INVALID_REQUEST);
@@ -278,7 +297,7 @@ public class ReplicationControlManagerTest {
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidLeaderEpochRequest.setLeaderEpoch(500);
ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+ replicationControl, 1, currentBrokerEpoch(ctx, 1),
"foo", invalidLeaderEpochRequest);
assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, Errors.INVALID_REQUEST);
@@ -287,7 +306,7 @@ public class ReplicationControlManagerTest {
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
ControllerResult<AlterIsrResponseData> invalidIsrResult1 = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+ replicationControl, 1, currentBrokerEpoch(ctx, 1),
"foo", invalidIsrRequest1);
assertAlterIsrResponse(invalidIsrResult1, topicPartition, Errors.INVALID_REQUEST);
@@ -296,16 +315,16 @@ public class ReplicationControlManagerTest {
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
ControllerResult<AlterIsrResponseData> invalidIsrResult2 = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+ replicationControl, 1, currentBrokerEpoch(ctx, 1),
"foo", invalidIsrRequest2);
assertAlterIsrResponse(invalidIsrResult2, topicPartition, Errors.INVALID_REQUEST);
}
private long currentBrokerEpoch(
- ReplicationControlManager replicationControl,
+ ReplicationControlTestContext ctx,
int brokerId
) {
- Map<Integer, BrokerRegistration> registrations = replicationControl.clusterControl.brokerRegistrations();
+ Map<Integer, BrokerRegistration> registrations = ctx.clusterControl.brokerRegistrations();
BrokerRegistration registration = registrations.get(brokerId);
assertNotNull(registration, "No current registration for broker " + brokerId);
return registration.epoch();
@@ -391,4 +410,100 @@ public class ReplicationControlManagerTest {
assertEquals(expectedIsr, partitionData.isr());
}
+ private void assertCreatedTopicConfigs(
+ ReplicationControlTestContext ctx,
+ String topic,
+ CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs
+ ) {
+ Map<String, String> configs = ctx.configurationControl.getConfigs(
+ new ConfigResource(ConfigResource.Type.TOPIC, topic));
+ assertEquals(requestConfigs.size(), configs.size());
+ for (CreateTopicsRequestData.CreateableTopicConfig requestConfig : requestConfigs) {
+ String value = configs.get(requestConfig.name());
+ assertEquals(requestConfig.value(), value);
+ }
+ }
+
+ private void assertEmptyTopicConfigs(
+ ReplicationControlTestContext ctx,
+ String topic
+ ) {
+ Map<String, String> configs = ctx.configurationControl.getConfigs(
+ new ConfigResource(ConfigResource.Type.TOPIC, topic));
+ assertEquals(Collections.emptyMap(), configs);
+ }
+
+ @Test
+ public void testDeleteTopics() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs =
+ new CreateTopicsRequestData.CreateableTopicConfigCollection();
+ requestConfigs.add(new CreateTopicsRequestData.CreateableTopicConfig().
+ setName("cleanup.policy").setValue("compact"));
+ requestConfigs.add(new CreateTopicsRequestData.CreateableTopicConfig().
+ setName("min.cleanable.dirty.ratio").setValue("0.1"));
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(3).setReplicationFactor((short) 2).
+ setConfigs(requestConfigs));
+ registerBroker(0, ctx);
+ unfenceBroker(0, ctx);
+ registerBroker(1, ctx);
+ unfenceBroker(1, ctx);
+ ControllerResult<CreateTopicsResponseData> result =
+ replicationControl.createTopics(request);
+ CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+ Uuid topicId = result.response().topics().find("foo").topicId();
+ expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
+ setNumPartitions(3).setReplicationFactor((short) 2).
+ setErrorMessage(null).setErrorCode((short) 0).
+ setTopicId(topicId));
+ assertEquals(expectedResponse, result.response());
+ // Until the records are replayed, no changes are made
+ assertNull(replicationControl.getPartition(topicId, 0));
+ assertEmptyTopicConfigs(ctx, "foo");
+ ctx.replay(result.records());
+ assertNotNull(replicationControl.getPartition(topicId, 0));
+ assertNotNull(replicationControl.getPartition(topicId, 1));
+ assertNotNull(replicationControl.getPartition(topicId, 2));
+ assertNull(replicationControl.getPartition(topicId, 3));
+ assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
+
+ assertEquals(Collections.singletonMap(topicId, new ResultOrError<>("foo")),
+ replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
+ assertEquals(Collections.singletonMap("foo", new ResultOrError<>(topicId)),
+ replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
+ Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1,
+ topicId.getLeastSignificantBits());
+ assertEquals(Collections.singletonMap(invalidId,
+ new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))),
+ replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
+ assertEquals(Collections.singletonMap("bar",
+ new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
+ replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
+
+ ControllerResult<Map<Uuid, ApiError>> result1 = replicationControl.
+ deleteTopics(Collections.singletonList(invalidId));
+ assertEquals(0, result1.records().size());
+ assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
+ result1.response());
+ ControllerResult<Map<Uuid, ApiError>> result2 = replicationControl.
+ deleteTopics(Collections.singletonList(topicId));
+ assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
+ result2.response());
+ assertEquals(1, result2.records().size());
+ ctx.replay(result2.records());
+ assertNull(replicationControl.getPartition(topicId, 0));
+ assertNull(replicationControl.getPartition(topicId, 1));
+ assertNull(replicationControl.getPartition(topicId, 2));
+ assertNull(replicationControl.getPartition(topicId, 3));
+ assertEquals(Collections.singletonMap(topicId, new ResultOrError<>(
+ new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames(
+ Long.MAX_VALUE, Collections.singleton(topicId)));
+ assertEquals(Collections.singletonMap("foo", new ResultOrError<>(
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(
+ Long.MAX_VALUE, Collections.singleton("foo")));
+ assertEmptyTopicConfigs(ctx, "foo");
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
index bb5fe9d..19edceb 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
@@ -96,6 +96,8 @@ public class TimelineHashMapTest {
assertEquals("xyz", map.putIfAbsent(1, "ghi"));
map.putAll(Collections.singletonMap(2, "b"));
assertTrue(map.containsKey(2));
+ assertEquals("xyz", map.remove(1));
+ assertEquals("b", map.remove(2));
}
@Test