You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/12/02 02:34:38 UTC

[kafka] branch trunk updated: KAFKA-14358; Disallow creation of cluster metadata topic (#12885)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9b409de1e27 KAFKA-14358; Disallow creation of cluster metadata topic (#12885)
9b409de1e27 is described below

commit 9b409de1e2719f72e3c17663ce131e40dc459361
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Thu Dec 1 18:34:29 2022 -0800

    KAFKA-14358; Disallow creation of cluster metadata topic (#12885)
    
    With KRaft the cluster metadata topic (__cluster_metadata) has a different implementation compared to regular topic. The user should not be allowed to create this topic. This can cause issues if the metadata log dir is the same as one of the log dirs.
    
    This change returns an authorization error if the user tries to create the cluster metadata topic.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 14 ++++-----
 .../org/apache/kafka/common/internals/Topic.java   |  7 +++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  4 +--
 .../main/scala/kafka/server/ControllerApis.scala   | 19 +++++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 36 ++++++++++++++++++----
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  4 +--
 .../server/AbstractCreateTopicsRequestTest.scala   |  4 +--
 .../unit/kafka/server/ControllerApisTest.scala     |  3 +-
 .../kafka/server/CreateTopicsRequestTest.scala     | 11 ++++++-
 9 files changed, 75 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 92e41ed1f2d..d6ac23b2382 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -273,8 +273,8 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_NAME;
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_PARTITION;
+import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_NAME;
+import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
 import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
 import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
 import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
@@ -4375,7 +4375,7 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
                 return new Builder(DescribeQuorumRequest.singletonRequest(
-                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+                        new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, CLUSTER_METADATA_TOPIC_PARTITION.partition())));
             }
 
             @Override
@@ -4391,9 +4391,9 @@ public class KafkaAdminClient extends AdminClient {
                     throw new UnknownServerException(msg);
                 }
                 DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
-                if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) {
                     String msg = String.format("DescribeMetadataQuorum received a topic with name %s when %s was expected",
-                            topic.topicName(), METADATA_TOPIC_NAME);
+                            topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
@@ -4404,9 +4404,9 @@ public class KafkaAdminClient extends AdminClient {
                     throw new UnknownServerException(msg);
                 }
                 DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
-                if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                if (partition.partitionIndex() != CLUSTER_METADATA_TOPIC_PARTITION.partition()) {
                     String msg = String.format("DescribeMetadataQuorum received a single partition with index %d when %d was expected",
-                            partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                            partition.partitionIndex(), CLUSTER_METADATA_TOPIC_PARTITION.partition());
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index 92952a2c031..fe4a01c52a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -28,8 +28,11 @@ public class Topic {
 
     public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
     public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
-    public static final String METADATA_TOPIC_NAME = "__cluster_metadata";
-    public static final TopicPartition METADATA_TOPIC_PARTITION = new TopicPartition(METADATA_TOPIC_NAME, 0);
+    public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata";
+    public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition(
+        CLUSTER_METADATA_TOPIC_NAME,
+        0
+    );
     public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
 
     private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 293a00dce2e..d050f08e320 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -662,8 +662,8 @@ public class KafkaAdminClientTest {
             Boolean partitionCountError,
             Boolean partitionIndexError,
             Boolean emptyOptionals) {
-        String topicName = topicNameError ? "RANDOM" : Topic.METADATA_TOPIC_NAME;
-        Integer partitionIndex = partitionIndexError ? 1 : Topic.METADATA_TOPIC_PARTITION.partition();
+        String topicName = topicNameError ? "RANDOM" : Topic.CLUSTER_METADATA_TOPIC_NAME;
+        Integer partitionIndex = partitionIndexError ? 1 : Topic.CLUSTER_METADATA_TOPIC_PARTITION.partition();
         List<DescribeQuorumResponseData.TopicData> topics = new ArrayList<>();
         List<DescribeQuorumResponseData.PartitionData> partitions = new ArrayList<>();
         for (int i = 0; i < (partitionCountError ? 2 : 1); i++) {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 235660cec0a..657c2965533 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_A
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
 import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
@@ -374,12 +375,21 @@ class ControllerApis(val requestChannel: RequestChannel,
         }
       }
     }
+
+    /* The cluster metatdata topic is an internal topic with a different implementation. The user should not be
+     * allowed to create it as a regular topic.
+     */
+    if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
+      info(s"Rejecting creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME}")
+    }
+    val allowedTopicNames = topicNames.asScala.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
+
     val authorizedTopicNames = if (hasClusterAuth) {
-      topicNames.asScala
+      allowedTopicNames
     } else {
-      getCreatableTopics.apply(topicNames.asScala)
+      getCreatableTopics.apply(allowedTopicNames)
     }
-    val describableTopicNames = getDescribableTopics.apply(topicNames.asScala).asJava
+    val describableTopicNames = getDescribableTopics.apply(allowedTopicNames).asJava
     val effectiveRequest = request.duplicate()
     val iterator = effectiveRequest.topics().iterator()
     while (iterator.hasNext) {
@@ -400,7 +410,8 @@ class ControllerApis(val requestChannel: RequestChannel,
         if (!authorizedTopicNames.contains(name)) {
           response.topics().add(new CreatableTopicResult().
             setName(name).
-            setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
+            setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+            setErrorMessage("Authorization failed."))
         }
       }
       response
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0913ffff4d3..d8cc117d189 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1898,12 +1898,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       val hasClusterAuthorization = authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
         logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else authHelper.filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+
+      val allowedTopicNames = {
+        val topicNames = createTopicsRequest
+          .data
+          .topics
+          .asScala
+          .map(_.name)
+          .toSet
+
+          /* The cluster metatdata topic is an internal topic with a different implementation. The user should not be
+           * allowed to create it as a regular topic.
+           */
+          if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
+            info(s"Rejecting creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME}")
+          }
+          topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
+      }
+
+      val authorizedTopics = if (hasClusterAuthorization) {
+        allowedTopicNames.toSet
+      } else {
+        authHelper.filterByAuthorized(request.context, CREATE, TOPIC, allowedTopicNames)(identity)
+      }
+      val authorizedForDescribeConfigs = authHelper.filterByAuthorized(
+        request.context,
+        DESCRIBE_CONFIGS,
+        TOPIC,
+        allowedTopicNames,
+        logIfDenied = false
+      )(identity).map(name => name -> results.find(name)).toMap
 
       results.forEach { topic =>
         if (results.findAll(topic.name).size > 1) {
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 84ea10da57f..76a874b2197 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -161,8 +161,8 @@ class KafkaRaftServer(
 }
 
 object KafkaRaftServer {
-  val MetadataTopic = Topic.METADATA_TOPIC_NAME
-  val MetadataPartition = Topic.METADATA_TOPIC_PARTITION
+  val MetadataTopic = Topic.CLUSTER_METADATA_TOPIC_NAME
+  val MetadataPartition = Topic.CLUSTER_METADATA_TOPIC_PARTITION
   val MetadataTopicId = Uuid.METADATA_TOPIC_ID
 
   sealed trait ProcessRole
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index ecee2cd19c4..627a939ddcf 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -163,9 +163,9 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
       if (actual == null) {
         throw new RuntimeException(s"No response data found for topic $topicName")
       }
-      assertEquals(expected.error.code(), actual.errorCode(), "The response error should match")
+      assertEquals(expected.error.code(), actual.errorCode(), "The response error code should match")
       if (checkErrorMessage) {
-        assertEquals(expected.message, actual.errorMessage())
+        assertEquals(expected.message, actual.errorMessage(), "The response error message should match")
       }
       // If no error validate topic exists
       if (expectedError.isSuccess && !request.data.validateOnly) {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 05bd13d795d..969d57c0dd2 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -554,7 +554,8 @@ class ControllerApisTest {
         setTopicId(new Uuid(0L, 2L)).
         setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
       new CreatableTopicResult().setName("quux").
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+        setErrorMessage("Authorization failed."))
     assertEquals(expectedResponse, controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
       false,
       _ => Set("baz", "indescribable"),
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 57834234cc1..a193db284c4 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import kafka.utils._
 import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
 import org.apache.kafka.common.protocol.ApiKeys
@@ -27,7 +28,6 @@ import org.apache.kafka.common.requests.CreateTopicsRequest
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
-
 import scala.jdk.CollectionConverters._
 
 class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@@ -197,4 +197,13 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
         assertEquals(Uuid.ZERO_UUID, topicResponse.topicId())
     }
   }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateClusterMetadataTopic(quorum: String): Unit = {
+    validateErrorCreateTopicsRequests(
+      topicsReq(Seq(topicReq(Topic.CLUSTER_METADATA_TOPIC_NAME))),
+      Map(Topic.CLUSTER_METADATA_TOPIC_NAME -> error(Errors.TOPIC_AUTHORIZATION_FAILED, Some("Authorization failed.")))
+    )
+  }
 }