You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 21:42:41 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583886777



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,

Review comment:
       There seems to be enough complexity in the handling here that it might be worth pulling this logic into a separate class. Not required for this PR, but it would be nice to come up with a nicer pattern so that we don't end up with a giant class like `KafkaApis`.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())

Review comment:
       If the controller does not have the topic id mapping, then the error here will be `UNKNOWN_TOPIC_OR_PARTITION`. As far as I can tell, this would get returned in the response to the client. This behavior differs from the handling logic in `KafkaApis` where we always check authorization first. The problem is that this implicitly leaks topic existence.

##########
File path: core/src/test/java/kafka/test/MockController.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.ResultOrError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public class MockController implements Controller {
+    private final static NotControllerException NOT_CONTROLLER_EXCEPTION =
+        new NotControllerException("This is not the correct controller for this cluster.");
+
+    public static class Builder {
+        private final Map<String, MockTopic> initialTopics = new HashMap<>();
+
+        public Builder newInitialTopic(String name, Uuid id) {
+            initialTopics.put(name, new MockTopic(name, id));
+            return this;
+        }
+
+        public MockController build() {
+            return new MockController(initialTopics.values());
+        }
+    }
+
+    private volatile boolean active = true;
+
+    private MockController(Collection<MockTopic> initialTopics) {
+        for (MockTopic topic : initialTopics) {
+            topics.put(topic.id, topic);
+            topicNameToId.put(topic.name, topic.id);
+        }
+    }
+
+    @Override
+    public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> unregisterBroker(int brokerId) {
+        throw new UnsupportedOperationException();
+    }
+
+    static class MockTopic {
+        private final String name;
+        private final Uuid id;
+
+        MockTopic(String name, Uuid id) {
+            this.name = name;
+            this.id = id;
+        }
+    }
+
+    private final Map<String, Uuid> topicNameToId = new HashMap<>();
+
+    private final Map<Uuid, MockTopic> topics = new HashMap<>();
+
+    @Override
+    synchronized public CompletableFuture<Map<String, ResultOrError<Uuid>>>
+            findTopicIds(Collection<String> topicNames) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String topicName : topicNames) {
+            if (!topicNameToId.containsKey(topicName)) {
+                System.out.println("WATERMELON: findTopicIds failed to find " + topicName);

Review comment:
       Guessing you aren't planning to commit this 😉 .

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {

Review comment:
       nit: can you use `Implicits.forKeyValue`?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+      request.context.apiVersion(),
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator())).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    val responses = new util.ArrayList[DeletableTopicResult]
+    val duplicatedTopicNames = new util.HashSet[String]
+    val topicNamesToResolve = new util.HashSet[String]
+    val topicIdsToResolve = new util.HashSet[Uuid]
+    val duplicatedTopicIds = new util.HashSet[Uuid]
+
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error().code()).
+        setErrorMessage(error.message()))
+    }
+
+    def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+      if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name."))
+        topicNamesToResolve.remove(name)
+        duplicatedTopicNames.add(name)
+      }
+    }
+
+    def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+      if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+        appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID."))
+        topicIdsToResolve.remove(id)
+        duplicatedTopicIds.add(id)
+      }
+    }
+
+    request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+    request.topics().iterator().asScala.foreach {
+      topic => if (topic.name() == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else {
+          maybeAppendToIdsToResolve(topic.topicId())
+        }
+      } else {
+        if (topic.topicId().equals(ZERO_UUID)) {
+          maybeAppendToTopicNamesToResolve(topic.name())
+        } else {
+          appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+
+    val idToName = new util.HashMap[Uuid, String]
+
+    def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+      if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+              "The same topic was specified by name and by id."))
+          idToName.remove(id)
+          duplicatedTopicIds.add(id)
+      }
+    }
+    controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+      case (name, idOrError) => if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error())
+      } else {
+        maybeAppendToIdToName(idOrError.result(), name)
+      }
+    }
+    controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+      case (id, nameOrError) => if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        maybeAppendToIdToName(id, nameOrError.result())
+      }
+    }
+
+    if (!hasClusterAuth) {
+      val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala)
+      val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala)
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val topicName = entry.getValue
+        if (!authorizedDeleteTopics.contains(topicName)) {
+          val topicId = entry.getKey
+          if (!authorizedDescribeTopics.contains(topicName)) {
+            // If the user is not authorized to describe the topic, we pretend that it

Review comment:
       The error should be `TOPIC_AUTHORIZATION_FAILED` if the client does not have describe permission regardless of existence.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org