You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/04/13 18:59:45 UTC

[kafka] branch trunk updated: KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 87aa8259dd KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910
87aa8259dd is described below

commit 87aa8259ddf5d4e5ed75aa41c4ba2ad65e2624a6
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Mar 17 16:25:45 2022 +0800

    KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910
    
    In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
    names. We should implement this in KRaft mode.  This PR also changes TopicCommandIntegrationTest to
    support KRaft mode.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../org/apache/kafka/common/internals/Topic.java   |  13 +-
 .../apache/kafka/common/internals/TopicTest.java   |   9 +
 .../kafka/admin/TopicCommandIntegrationTest.scala  | 306 +++++++++++++--------
 .../controller/ReplicationControlManager.java      |  51 +++-
 .../controller/ReplicationControlManagerTest.java  |  21 +-
 5 files changed, 278 insertions(+), 122 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index 7a5fefb3d9..3c93ef87b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -67,6 +67,17 @@ public class Topic {
         return topic.contains("_") || topic.contains(".");
     }
 
+    /**
+     * Unify topic name with a period ('.') or underscore ('_'), this is only used to check collision and will not
+     * be used to really change topic name.
+     *
+     * @param topic A topic to unify
+     * @return A unified topic name
+     */
+    public static String unifyCollisionChars(String topic) {
+        return topic.replace('.', '_');
+    }
+
     /**
      * Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position.
      *
@@ -75,7 +86,7 @@ public class Topic {
      * @return true if the topics collide
      */
     public static boolean hasCollision(String topicA, String topicB) {
-        return topicA.replace('.', '_').equals(topicB.replace('.', '_'));
+        return unifyCollisionChars(topicA).equals(unifyCollisionChars(topicB));
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
index 9bf237fb1b..03c0811fa4 100644
--- a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -81,6 +82,14 @@ public class TopicTest {
             assertTrue(Topic.hasCollisionChars(topic));
     }
 
+    @Test
+    public void testUnifyCollisionChars() {
+        assertEquals("topic", Topic.unifyCollisionChars("topic"));
+        assertEquals("_topic", Topic.unifyCollisionChars(".topic"));
+        assertEquals("_topic", Topic.unifyCollisionChars("_topic"));
+        assertEquals("__topic", Topic.unifyCollisionChars("_.topic"));
+    }
+
     @Test
     public void testTopicHasCollision() {
         List<String> periodFirstMiddleLastNone = Arrays.asList(".topic", "to.pic", "topic.", "topic");
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
index 9a1fe378f6..ee7e64957e 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
@@ -17,23 +17,24 @@
 package kafka.admin
 
 import java.util.{Collection, Collections, Optional, Properties}
-
 import kafka.admin.TopicCommand.{TopicCommandOptions, TopicService}
 import kafka.integration.KafkaServerTestHarness
-import kafka.server.{ConfigType, KafkaConfig}
-import kafka.utils.{Logging, TestUtils}
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, ThrottlingQuotaExceededException, TopicExistsException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, ThrottlingQuotaExceededException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatcher
 import org.mockito.ArgumentMatchers.{eq => eqThat, _}
 import org.mockito.Mockito._
@@ -54,7 +55,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     */
   override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(
     numConfigs = 6,
-    zkConnect = zkConnect,
+    zkConnect = zkConnectOrNull,
     rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"),
     numPartitions = numPartitions,
     defaultReplicationFactor = defaultReplicationFactor,
@@ -76,7 +77,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
   }
 
   private[this] def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = {
-    TestUtils.waitForPartitionMetadata(servers, topicName, partition = 0, timeout)
+    TestUtils.waitForPartitionMetadata(brokers, topicName, partition = 0, timeout)
   }
 
   @BeforeEach
@@ -98,16 +99,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
       topicService.close()
   }
 
-  @Test
-  def testCreate(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreate(quorum: String): Unit = {
     createAndWaitTopic(new TopicCommandOptions(
       Array("--partitions", "2", "--replication-factor", "1", "--topic", testTopicName)))
 
     adminClient.listTopics().names().get().contains(testTopicName)
   }
 
-  @Test
-  def testCreateWithDefaults(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithDefaults(quorum: String): Unit = {
     createAndWaitTopic(new TopicCommandOptions(Array("--topic", testTopicName)))
 
     val partitions = adminClient
@@ -120,8 +123,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
   }
 
-  @Test
-  def testCreateWithDefaultReplication(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithDefaultReplication(quorum: String): Unit = {
     createAndWaitTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--partitions", "2")))
 
@@ -135,8 +139,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
   }
 
-  @Test
-  def testCreateWithDefaultPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithDefaultPartitions(quorum: String): Unit = {
     createAndWaitTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--replication-factor", "2")))
 
@@ -151,8 +156,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertEquals(partitions.get(0).replicas().size(), 2)
   }
 
-  @Test
-  def testCreateWithConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithConfigs(quorum: String): Unit = {
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)
     createAndWaitTopic(new TopicCommandOptions(
       Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", "delete.retention.ms=1000")))
@@ -163,8 +169,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()))
   }
 
-  @Test
-  def testCreateWhenAlreadyExists(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWhenAlreadyExists(quorum: String): Unit = {
     val numPartitions = 1
 
     // create the topic
@@ -176,15 +183,17 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertThrows(classOf[TopicExistsException], () => topicService.createTopic(createOpts))
   }
 
-  @Test
-  def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWhenAlreadyExistsWithIfNotExists(quorum: String): Unit = {
     val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
     createAndWaitTopic(createOpts)
     topicService.createTopic(createOpts)
   }
 
-  @Test
-  def testCreateWithReplicaAssignment(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithReplicaAssignment(quorum: String): Unit = {
     // create the topic
     val createOpts = new TopicCommandOptions(
       Array("--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName))
@@ -202,37 +211,42 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertEquals(List(1, 0), partitions.get(2).replicas().asScala.map(_.id()))
   }
 
-  @Test
-  def testCreateWithInvalidReplicationFactor(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithInvalidReplicationFactor(quorum: String): Unit = {
     assertThrows(classOf[IllegalArgumentException],
       () => topicService.createTopic(new TopicCommandOptions(
         Array("--partitions", "2", "--replication-factor", (Short.MaxValue+1).toString, "--topic", testTopicName))))
   }
 
-  @Test
-  def testCreateWithNegativeReplicationFactor(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithNegativeReplicationFactor(quorum: String): Unit = {
     assertThrows(classOf[IllegalArgumentException],
       () => topicService.createTopic(new TopicCommandOptions(
         Array("--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName))))
   }
 
-  @Test
-  def testCreateWithNegativePartitionCount(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithNegativePartitionCount(quorum: String): Unit = {
     assertThrows(classOf[IllegalArgumentException],
       () => topicService.createTopic(new TopicCommandOptions(
         Array("--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName))))
   }
 
-  @Test
-  def testInvalidTopicLevelConfig(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidTopicLevelConfig(quorum: String): Unit = {
     val createOpts = new TopicCommandOptions(
       Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName,
         "--config", "message.timestamp.type=boom"))
     assertThrows(classOf[ConfigException], () => topicService.createTopic(createOpts))
   }
 
-  @Test
-  def testListTopics(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListTopics(quorum: String): Unit = {
     createAndWaitTopic(new TopicCommandOptions(
       Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
 
@@ -242,8 +256,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertTrue(output.contains(testTopicName))
   }
 
-  @Test
-  def testListTopicsWithIncludeList(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListTopicsWithIncludeList(quorum: String): Unit = {
     val topic1 = "kafka.testTopic1"
     val topic2 = "kafka.testTopic2"
     val topic3 = "oooof.testTopic1"
@@ -264,8 +279,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertFalse(output.contains(topic3))
   }
 
-  @Test
-  def testListTopicsWithExcludeInternal(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListTopicsWithExcludeInternal(quorum: String): Unit = {
     val topic1 = "kafka.testTopic1"
     adminClient.createTopics(
       List(new NewTopic(topic1, 2, 2.toShort),
@@ -280,8 +296,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
   }
 
-  @Test
-  def testAlterPartitionCount(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterPartitionCount(quorum: String): Unit = {
     adminClient.createTopics(
       List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get()
     waitForTopicCreated(testTopicName)
@@ -289,12 +306,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     topicService.alterTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--partitions", "3")))
 
+    TestUtils.waitUntilTrue(
+      () => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
+      "Timeout waiting new assignment propagate to broker")
     val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
     assertTrue(topicDescription.partitions().size() == 3)
   }
 
-  @Test
-  def testAlterAssignment(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterAssignment(quorum: String): Unit = {
     adminClient.createTopics(
       Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get()
     waitForTopicCreated(testTopicName)
@@ -307,8 +328,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertEquals(List(4,2), topicDescription.partitions().get(2).replicas().asScala.map(_.id()))
   }
 
-  @Test
-  def testAlterAssignmentWithMoreAssignmentThanPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String): Unit = {
     adminClient.createTopics(
       List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get()
     waitForTopicCreated(testTopicName)
@@ -318,8 +340,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
         Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3"))))
   }
 
-  @Test
-  def testAlterAssignmentWithMorePartitionsThanAssignment(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String): Unit = {
     adminClient.createTopics(
       List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get()
     waitForTopicCreated(testTopicName)
@@ -329,8 +352,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
         Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6"))))
   }
 
-  @Test
-  def testAlterWithInvalidPartitionCount(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterWithInvalidPartitionCount(quorum: String): Unit = {
     createAndWaitTopic(new TopicCommandOptions(
       Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
 
@@ -339,22 +363,25 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
         Array("--partitions", "-1", "--topic", testTopicName))))
   }
 
-  @Test
-  def testAlterWhenTopicDoesntExist(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterWhenTopicDoesntExist(quorum: String): Unit = {
     // alter a topic that does not exist without --if-exists
     val alterOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--partitions", "1"))
     val topicService = TopicService(adminClient)
     assertThrows(classOf[IllegalArgumentException], () => topicService.alterTopic(alterOpts))
   }
 
-  @Test
-  def testAlterWhenTopicDoesntExistWithIfExists(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAlterWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
     topicService.alterTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--partitions", "1", "--if-exists")))
   }
 
-  @Test
-  def testCreateAlterTopicWithRackAware(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateAlterTopicWithRackAware(quorum: String): Unit = {
     val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
 
     val numPartitions = 18
@@ -365,9 +392,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
       "--topic", testTopicName))
     createAndWaitTopic(createOpts)
 
-    var assignment = zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp, replicas) =>
-      tp.partition -> replicas
-    }
+    var assignment = adminClient.describeTopics(Collections.singletonList(testTopicName))
+      .allTopicNames().get().get(testTopicName).partitions()
+      .asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap
     checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
 
     val alteredNumPartitions = 36
@@ -376,14 +403,19 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
       "--partitions", alteredNumPartitions.toString,
       "--topic", testTopicName))
     topicService.alterTopic(alterOpts)
-    assignment = zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp, replicas) =>
-      tp.partition -> replicas
-    }
+
+    TestUtils.waitUntilTrue(
+      () => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == alteredNumPartitions),
+      "Timeout waiting new assignment propagate to broker")
+    assignment = adminClient.describeTopics(Collections.singletonList(testTopicName))
+      .allTopicNames().get().get(testTopicName).partitions()
+      .asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap
     checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
   }
 
-  @Test
-  def testConfigPreservationAcrossPartitionAlteration(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConfigPreservationAcrossPartitionAlteration(quorum: String): Unit = {
     val numPartitionsOriginal = 1
     val cleanupKey = "cleanup.policy"
     val cleanupVal = "compact"
@@ -395,25 +427,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
       "--config", cleanupKey + "=" + cleanupVal,
       "--topic", testTopicName))
     createAndWaitTopic(createOpts)
-    val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName)
-    assertTrue(props.containsKey(cleanupKey), "Properties after creation don't contain " + cleanupKey)
-    assertTrue(props.getProperty(cleanupKey).equals(cleanupVal), "Properties after creation have incorrect value")
+    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)
+    val props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
+    // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName)
+    assertNotNull(props.get(cleanupKey), "Properties after creation don't contain " + cleanupKey)
+    assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties after creation have incorrect value")
 
     // pre-create the topic config changes path to avoid a NoNodeException
-    zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+    if (!isKRaftTest()) {
+      zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+    }
 
     // modify the topic to add new partitions
     val numPartitionsModified = 3
     val alterOpts = new TopicCommandOptions(
       Array("--partitions", numPartitionsModified.toString, "--topic", testTopicName))
     topicService.alterTopic(alterOpts)
-    val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName)
-    assertTrue(newProps.containsKey(cleanupKey), "Updated properties do not contain " + cleanupKey)
-    assertTrue(newProps.getProperty(cleanupKey).equals(cleanupVal), "Updated properties have incorrect value")
+    val newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
+    assertNotNull(newProps.get(cleanupKey), "Updated properties do not contain " + cleanupKey)
+    assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated properties have incorrect value")
   }
 
-  @Test
-  def testTopicDeletion(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicDeletion(quorum: String): Unit = {
     // create the NormalTopic
     val createOpts = new TopicCommandOptions(Array("--partitions", "1",
       "--replication-factor", "1",
@@ -423,14 +460,17 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     // delete the NormalTopic
     val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
 
-    val deletePath = DeleteTopicsTopicZNode.path(testTopicName)
-    assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.")
+    if (!isKRaftTest()) {
+      val deletePath = DeleteTopicsTopicZNode.path(testTopicName)
+      assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.")
+    }
     topicService.deleteTopic(deleteOpts)
-    TestUtils.verifyTopicDeletion(zkClient, testTopicName, 1, servers)
+    TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers)
   }
 
-  @Test
-  def testDeleteInternalTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteInternalTopic(quorum: String): Unit = {
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", "1",
       "--replication-factor", "1",
@@ -443,25 +483,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     val deleteOffsetTopicOpts = new TopicCommandOptions(
       Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
     val deleteOffsetTopicPath = DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME)
-    assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion.")
+    if (!isKRaftTest()) {
+      assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion.")
+    }
     topicService.deleteTopic(deleteOffsetTopicOpts)
-    TestUtils.verifyTopicDeletion(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 1, servers)
+    TestUtils.verifyTopicDeletion(zkClientOrNull, Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers)
   }
 
-  @Test
-  def testDeleteWhenTopicDoesntExist(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteWhenTopicDoesntExist(quorum: String): Unit = {
     // delete a topic that does not exist
     val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
     assertThrows(classOf[IllegalArgumentException], () => topicService.deleteTopic(deleteOpts))
   }
 
-  @Test
-  def testDeleteWhenTopicDoesntExistWithIfExists(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
     topicService.deleteTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
   }
 
-  @Test
-  def testDescribe(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribe(quorum: String): Unit = {
     adminClient.createTopics(
       Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get()
     waitForTopicCreated(testTopicName)
@@ -473,19 +518,22 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
   }
 
-  @Test
-  def testDescribeWhenTopicDoesntExist(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeWhenTopicDoesntExist(quorum: String): Unit = {
     assertThrows(classOf[IllegalArgumentException],
       () => topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
   }
 
-  @Test
-  def testDescribeWhenTopicDoesntExistWithIfExists(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
     topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
   }
 
-  @Test
-  def testDescribeUnavailablePartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUnavailablePartitions(quorum: String): Unit = {
     adminClient.createTopics(
       Collections.singletonList(new NewTopic(testTopicName, 6, 1.toShort))).all().get()
     waitForTopicCreated(testTopicName)
@@ -500,7 +548,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
 
       // wait until the topic metadata for the test topic is propagated to each alive broker
       TestUtils.waitUntilTrue(() => {
-        servers
+        brokers
           .filterNot(_.config.brokerId == 0)
           .foldLeft(true) {
             (result, server) => {
@@ -527,15 +575,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     }
   }
 
-  @Test
-  def testDescribeUnderReplicatedPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
     adminClient.createTopics(
       Collections.singletonList(new NewTopic(testTopicName, 1, 6.toShort))).all().get()
     waitForTopicCreated(testTopicName)
 
     try {
       killBroker(0)
-      val aliveServers = servers.filterNot(_.config.brokerId == 0)
+      val aliveServers = brokers.filterNot(_.config.brokerId == 0)
       TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
@@ -546,8 +595,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     }
   }
 
-  @Test
-  def testDescribeUnderMinIsrPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUnderMinIsrPartitions(quorum: String): Unit = {
     val configMap = new java.util.HashMap[String, String]()
     configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6")
 
@@ -557,7 +607,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
 
     try {
       killBroker(0)
-      val aliveServers = servers.filterNot(_.config.brokerId == 0)
+      val aliveServers = brokers.filterNot(_.config.brokerId == 0)
       TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
@@ -568,8 +618,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     }
   }
 
-  @Test
-  def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum: String): Unit = {
     val configMap = new java.util.HashMap[String, String]()
     val replicationFactor: Short = 1
     val partitions = 1
@@ -580,12 +631,12 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     waitForTopicCreated(testTopicName)
 
     // Produce multiple batches.
-    TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
-    TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
+    TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1)
+    TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1)
 
     // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication
     // throughput so the reassignment doesn't complete quickly.
-    val brokerIds = servers.map(_.config.brokerId)
+    val brokerIds = brokers.map(_.config.brokerId)
     TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1)
 
     val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName)
@@ -622,8 +673,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     TestUtils.waitForAllReassignmentsToComplete(adminClient)
   }
 
-  @Test
-  def testDescribeAtMinIsrPartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeAtMinIsrPartitions(quorum: String): Unit = {
     val configMap = new java.util.HashMap[String, String]()
     configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
 
@@ -653,8 +705,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     *
     * Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition
     */
-  @Test
-  def testDescribeUnderMinIsrPartitionsMixed(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeUnderMinIsrPartitionsMixed(quorum: String): Unit = {
     val underMinIsrTopic = "under-min-isr-topic"
     val notUnderMinIsrTopic = "not-under-min-isr-topic"
     val offlineTopic = "offline-topic"
@@ -677,7 +730,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
 
     try {
       killBroker(0)
-      val aliveServers = servers.filterNot(_.config.brokerId == 0)
+      val aliveServers = brokers.filterNot(_.config.brokerId == 0)
       TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
@@ -690,8 +743,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     }
   }
 
-  @Test
-  def testDescribeReportOverriddenConfigs(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeReportOverriddenConfigs(quorum: String): Unit = {
     val config = "file.delete.delay.ms=1000"
     createAndWaitTopic(new TopicCommandOptions(
       Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", config)))
@@ -700,8 +754,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertTrue(output.contains(config), s"Describe output should have contained $config")
   }
 
-  @Test
-  def testDescribeAndListTopicsWithoutInternalTopics(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeAndListTopicsWithoutInternalTopics(quorum: String): Unit = {
     createAndWaitTopic(
       new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
     // create a internal topic
@@ -720,8 +775,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
   }
 
-  @Test
-  def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(quorum: String): Unit = {
     adminClient = spy(adminClient)
     topicService = TopicService(adminClient)
 
@@ -746,8 +802,20 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
   }
 
-  @Test
-  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateWithTopicNameCollision(quorum: String): Unit = {
+    adminClient.createTopics(
+      Collections.singletonList(new NewTopic("foo_bar", 1, 6.toShort))).all().get()
+    waitForTopicCreated("foo_bar")
+
+    assertThrows(classOf[InvalidTopicException],
+      () => topicService.createTopic(new TopicCommandOptions(Array("--topic", "foo.bar"))))
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = {
     val adminClient = mock(classOf[Admin])
     val topicService = TopicService(adminClient)
 
@@ -766,8 +834,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     )
   }
 
-  @Test
-  def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = {
     val adminClient = mock(classOf[Admin])
     val topicService = TopicService(adminClient)
 
@@ -787,8 +856,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
     )
   }
 
-  @Test
-  def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = {
     val adminClient = mock(classOf[Admin])
     val topicService = TopicService(adminClient)
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d8005e60ce..f104364d1a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -216,6 +216,23 @@ public class ReplicationControlManager {
      */
     private final TimelineHashMap<String, Uuid> topicsByName;
 
+    /**
+     * We try to prevent topics from being created if their names would collide with
+     * existing topics when periods in the topic name are replaced with underscores.
+     * The reason for this is that some per-topic metrics do replace periods with
+     * underscores, and would therefore be ambiguous otherwise.
+     *
+     * This map is from normalized topic name to a set of topic names. So if we had two
+     * topics named foo.bar and foo_bar this map would contain
+     * a mapping from foo_bar to a set containing foo.bar and foo_bar.
+     *
+     * Since we reject topic creations that would collide, under normal conditions the
+     * sets in this map should only have a size of 1. However, if the cluster was
+     * upgraded from a version prior to KAFKA-13743, it may be possible to have more
+     * values here, since collidiing topic names will be "grandfathered in."
+     */
+    private final TimelineHashMap<String, TimelineHashSet<String>> topicsWithCollisionChars;
+
     /**
      * Maps topic UUIDs to structures containing topic information, including partitions.
      */
@@ -258,6 +275,7 @@ public class ReplicationControlManager {
         this.clusterControl = clusterControl;
         this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
         this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.topicsWithCollisionChars = new TimelineHashMap<>(snapshotRegistry, 0);
         this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
         this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
         this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -266,6 +284,15 @@ public class ReplicationControlManager {
 
     public void replay(TopicRecord record) {
         topicsByName.put(record.name(), record.topicId());
+        if (Topic.hasCollisionChars(record.name())) {
+            String normalizedName = Topic.unifyCollisionChars(record.name());
+            TimelineHashSet<String> topicNames = topicsWithCollisionChars.get(normalizedName);
+            if (topicNames == null) {
+                topicNames = new TimelineHashSet<>(snapshotRegistry, 1);
+                topicsWithCollisionChars.put(normalizedName, topicNames);
+            }
+            topicNames.add(record.name());
+        }
         topics.put(record.topicId(),
             new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
         controllerMetrics.setGlobalTopicsCount(topics.size());
@@ -374,6 +401,16 @@ public class ReplicationControlManager {
                 " to remove.");
         }
         topicsByName.remove(topic.name);
+        if (Topic.hasCollisionChars(topic.name)) {
+            String normalizedName = Topic.unifyCollisionChars(topic.name);
+            TimelineHashSet<String> colliding = topicsWithCollisionChars.get(normalizedName);
+            if (colliding != null) {
+                colliding.remove(topic.name);
+                if (colliding.isEmpty()) {
+                    topicsWithCollisionChars.remove(topic.name);
+                }
+            }
+        }
         reassigningTopics.remove(record.topicId());
 
         // Delete the configurations associated with this topic.
@@ -407,7 +444,7 @@ public class ReplicationControlManager {
         List<ApiMessageAndVersion> records = new ArrayList<>();
 
         // Check the topic names.
-        validateNewTopicNames(topicErrors, request.topics());
+        validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars);
 
         // Identify topics that already exist and mark them with the appropriate error
         request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name()))
@@ -598,7 +635,8 @@ public class ReplicationControlManager {
     }
 
     static void validateNewTopicNames(Map<String, ApiError> topicErrors,
-                                      CreatableTopicCollection topics) {
+                                      CreatableTopicCollection topics,
+                                      Map<String, ? extends Set<String>> topicsWithCollisionChars) {
         for (CreatableTopic topic : topics) {
             if (topicErrors.containsKey(topic.name())) continue;
             try {
@@ -607,6 +645,15 @@ public class ReplicationControlManager {
                 topicErrors.put(topic.name(),
                     new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage()));
             }
+            if (Topic.hasCollisionChars(topic.name())) {
+                String normalizedName = Topic.unifyCollisionChars(topic.name());
+                Set<String> colliding = topicsWithCollisionChars.get(normalizedName);
+                if (colliding != null) {
+                    topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION,
+                        "Topic '" + topic.name() + "' collides with existing topic: " +
+                            colliding.iterator().next()));
+                }
+            }
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 108f2eca66..d095a9fe37 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -641,7 +642,7 @@ public class ReplicationControlManagerTest {
         topics.add(new CreatableTopic().setName(""));
         topics.add(new CreatableTopic().setName("woo"));
         topics.add(new CreatableTopic().setName("."));
-        ReplicationControlManager.validateNewTopicNames(topicErrors, topics);
+        ReplicationControlManager.validateNewTopicNames(topicErrors, topics, Collections.emptyMap());
         Map<String, ApiError> expectedTopicErrors = new HashMap<>();
         expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION,
             "Topic name is illegal, it can't be empty"));
@@ -650,6 +651,24 @@ public class ReplicationControlManagerTest {
         assertEquals(expectedTopicErrors, topicErrors);
     }
 
+    @Test
+    public void testTopicNameCollision() {
+        Map<String, ApiError> topicErrors = new HashMap<>();
+        CreatableTopicCollection topics = new CreatableTopicCollection();
+        topics.add(new CreatableTopic().setName("foo.bar"));
+        topics.add(new CreatableTopic().setName("woo.bar_foo"));
+        Map<String, Set<String>> collisionMap = new HashMap<>();
+        collisionMap.put("foo_bar", new TreeSet<>(Arrays.asList("foo_bar")));
+        collisionMap.put("woo_bar_foo", new TreeSet<>(Arrays.asList("woo.bar.foo", "woo_bar.foo")));
+        ReplicationControlManager.validateNewTopicNames(topicErrors, topics, collisionMap);
+        Map<String, ApiError> expectedTopicErrors = new HashMap<>();
+        expectedTopicErrors.put("foo.bar", new ApiError(INVALID_TOPIC_EXCEPTION,
+            "Topic 'foo.bar' collides with existing topic: foo_bar"));
+        expectedTopicErrors.put("woo.bar_foo", new ApiError(INVALID_TOPIC_EXCEPTION,
+            "Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo"));
+        assertEquals(expectedTopicErrors, topicErrors);
+    }
+
     @Test
     public void testRemoveLeaderships() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();