You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/04/13 18:59:45 UTC
[kafka] branch trunk updated: KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87aa8259dd KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910
87aa8259dd is described below
commit 87aa8259ddf5d4e5ed75aa41c4ba2ad65e2624a6
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Mar 17 16:25:45 2022 +0800
KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910
In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
names. We should implement this in KRaft mode. This PR also changes TopicCommandIntegrationTest to
support KRaft mode.
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../org/apache/kafka/common/internals/Topic.java | 13 +-
.../apache/kafka/common/internals/TopicTest.java | 9 +
.../kafka/admin/TopicCommandIntegrationTest.scala | 306 +++++++++++++--------
.../controller/ReplicationControlManager.java | 51 +++-
.../controller/ReplicationControlManagerTest.java | 21 +-
5 files changed, 278 insertions(+), 122 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index 7a5fefb3d9..3c93ef87b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -67,6 +67,17 @@ public class Topic {
return topic.contains("_") || topic.contains(".");
}
+ /**
+ * Unify topic name with a period ('.') or underscore ('_'), this is only used to check collision and will not
+ * be used to really change topic name.
+ *
+ * @param topic A topic to unify
+ * @return A unified topic name
+ */
+ public static String unifyCollisionChars(String topic) {
+ return topic.replace('.', '_');
+ }
+
/**
* Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position.
*
@@ -75,7 +86,7 @@ public class Topic {
* @return true if the topics collide
*/
public static boolean hasCollision(String topicA, String topicB) {
- return topicA.replace('.', '_').equals(topicB.replace('.', '_'));
+ return unifyCollisionChars(topicA).equals(unifyCollisionChars(topicB));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
index 9bf237fb1b..03c0811fa4 100644
--- a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -81,6 +82,14 @@ public class TopicTest {
assertTrue(Topic.hasCollisionChars(topic));
}
+ @Test
+ public void testUnifyCollisionChars() {
+ assertEquals("topic", Topic.unifyCollisionChars("topic"));
+ assertEquals("_topic", Topic.unifyCollisionChars(".topic"));
+ assertEquals("_topic", Topic.unifyCollisionChars("_topic"));
+ assertEquals("__topic", Topic.unifyCollisionChars("_.topic"));
+ }
+
@Test
public void testTopicHasCollision() {
List<String> periodFirstMiddleLastNone = Arrays.asList(".topic", "to.pic", "topic.", "topic");
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
index 9a1fe378f6..ee7e64957e 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
@@ -17,23 +17,24 @@
package kafka.admin
import java.util.{Collection, Collections, Optional, Properties}
-
import kafka.admin.TopicCommand.{TopicCommandOptions, TopicService}
import kafka.integration.KafkaServerTestHarness
-import kafka.server.{ConfigType, KafkaConfig}
-import kafka.utils.{Logging, TestUtils}
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, ThrottlingQuotaExceededException, TopicExistsException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, ThrottlingQuotaExceededException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatcher
import org.mockito.ArgumentMatchers.{eq => eqThat, _}
import org.mockito.Mockito._
@@ -54,7 +55,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
*/
override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(
numConfigs = 6,
- zkConnect = zkConnect,
+ zkConnect = zkConnectOrNull,
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"),
numPartitions = numPartitions,
defaultReplicationFactor = defaultReplicationFactor,
@@ -76,7 +77,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
}
private[this] def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = {
- TestUtils.waitForPartitionMetadata(servers, topicName, partition = 0, timeout)
+ TestUtils.waitForPartitionMetadata(brokers, topicName, partition = 0, timeout)
}
@BeforeEach
@@ -98,16 +99,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
topicService.close()
}
- @Test
- def testCreate(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreate(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "1", "--topic", testTopicName)))
adminClient.listTopics().names().get().contains(testTopicName)
}
- @Test
- def testCreateWithDefaults(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithDefaults(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(Array("--topic", testTopicName)))
val partitions = adminClient
@@ -120,8 +123,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
}
- @Test
- def testCreateWithDefaultReplication(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithDefaultReplication(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "2")))
@@ -135,8 +139,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
}
- @Test
- def testCreateWithDefaultPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithDefaultPartitions(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replication-factor", "2")))
@@ -151,8 +156,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertEquals(partitions.get(0).replicas().size(), 2)
}
- @Test
- def testCreateWithConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithConfigs(quorum: String): Unit = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", "delete.retention.ms=1000")))
@@ -163,8 +169,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()))
}
- @Test
- def testCreateWhenAlreadyExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWhenAlreadyExists(quorum: String): Unit = {
val numPartitions = 1
// create the topic
@@ -176,15 +183,17 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertThrows(classOf[TopicExistsException], () => topicService.createTopic(createOpts))
}
- @Test
- def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWhenAlreadyExistsWithIfNotExists(quorum: String): Unit = {
val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
createAndWaitTopic(createOpts)
topicService.createTopic(createOpts)
}
- @Test
- def testCreateWithReplicaAssignment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithReplicaAssignment(quorum: String): Unit = {
// create the topic
val createOpts = new TopicCommandOptions(
Array("--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName))
@@ -202,37 +211,42 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertEquals(List(1, 0), partitions.get(2).replicas().asScala.map(_.id()))
}
- @Test
- def testCreateWithInvalidReplicationFactor(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithInvalidReplicationFactor(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", (Short.MaxValue+1).toString, "--topic", testTopicName))))
}
- @Test
- def testCreateWithNegativeReplicationFactor(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithNegativeReplicationFactor(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName))))
}
- @Test
- def testCreateWithNegativePartitionCount(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithNegativePartitionCount(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName))))
}
- @Test
- def testInvalidTopicLevelConfig(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidTopicLevelConfig(quorum: String): Unit = {
val createOpts = new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName,
"--config", "message.timestamp.type=boom"))
assertThrows(classOf[ConfigException], () => topicService.createTopic(createOpts))
}
- @Test
- def testListTopics(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTopics(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
@@ -242,8 +256,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertTrue(output.contains(testTopicName))
}
- @Test
- def testListTopicsWithIncludeList(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTopicsWithIncludeList(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
val topic2 = "kafka.testTopic2"
val topic3 = "oooof.testTopic1"
@@ -264,8 +279,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertFalse(output.contains(topic3))
}
- @Test
- def testListTopicsWithExcludeInternal(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTopicsWithExcludeInternal(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
adminClient.createTopics(
List(new NewTopic(topic1, 2, 2.toShort),
@@ -280,8 +296,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
}
- @Test
- def testAlterPartitionCount(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterPartitionCount(quorum: String): Unit = {
adminClient.createTopics(
List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get()
waitForTopicCreated(testTopicName)
@@ -289,12 +306,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "3")))
+ TestUtils.waitUntilTrue(
+ () => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
+ "Timeout waiting new assignment propagate to broker")
val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
}
- @Test
- def testAlterAssignment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterAssignment(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get()
waitForTopicCreated(testTopicName)
@@ -307,8 +328,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertEquals(List(4,2), topicDescription.partitions().get(2).replicas().asScala.map(_.id()))
}
- @Test
- def testAlterAssignmentWithMoreAssignmentThanPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String): Unit = {
adminClient.createTopics(
List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get()
waitForTopicCreated(testTopicName)
@@ -318,8 +340,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3"))))
}
- @Test
- def testAlterAssignmentWithMorePartitionsThanAssignment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String): Unit = {
adminClient.createTopics(
List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get()
waitForTopicCreated(testTopicName)
@@ -329,8 +352,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6"))))
}
- @Test
- def testAlterWithInvalidPartitionCount(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterWithInvalidPartitionCount(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
@@ -339,22 +363,25 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
Array("--partitions", "-1", "--topic", testTopicName))))
}
- @Test
- def testAlterWhenTopicDoesntExist(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterWhenTopicDoesntExist(quorum: String): Unit = {
// alter a topic that does not exist without --if-exists
val alterOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--partitions", "1"))
val topicService = TopicService(adminClient)
assertThrows(classOf[IllegalArgumentException], () => topicService.alterTopic(alterOpts))
}
- @Test
- def testAlterWhenTopicDoesntExistWithIfExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "1", "--if-exists")))
}
- @Test
- def testCreateAlterTopicWithRackAware(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateAlterTopicWithRackAware(quorum: String): Unit = {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
val numPartitions = 18
@@ -365,9 +392,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
"--topic", testTopicName))
createAndWaitTopic(createOpts)
- var assignment = zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp, replicas) =>
- tp.partition -> replicas
- }
+ var assignment = adminClient.describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions()
+ .asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
val alteredNumPartitions = 36
@@ -376,14 +403,19 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
"--partitions", alteredNumPartitions.toString,
"--topic", testTopicName))
topicService.alterTopic(alterOpts)
- assignment = zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp, replicas) =>
- tp.partition -> replicas
- }
+
+ TestUtils.waitUntilTrue(
+ () => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == alteredNumPartitions),
+ "Timeout waiting new assignment propagate to broker")
+ assignment = adminClient.describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions()
+ .asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
}
- @Test
- def testConfigPreservationAcrossPartitionAlteration(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConfigPreservationAcrossPartitionAlteration(quorum: String): Unit = {
val numPartitionsOriginal = 1
val cleanupKey = "cleanup.policy"
val cleanupVal = "compact"
@@ -395,25 +427,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
"--config", cleanupKey + "=" + cleanupVal,
"--topic", testTopicName))
createAndWaitTopic(createOpts)
- val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName)
- assertTrue(props.containsKey(cleanupKey), "Properties after creation don't contain " + cleanupKey)
- assertTrue(props.getProperty(cleanupKey).equals(cleanupVal), "Properties after creation have incorrect value")
+ val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)
+ val props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
+ // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName)
+ assertNotNull(props.get(cleanupKey), "Properties after creation don't contain " + cleanupKey)
+ assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties after creation have incorrect value")
// pre-create the topic config changes path to avoid a NoNodeException
- zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+ if (!isKRaftTest()) {
+ zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+ }
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(
Array("--partitions", numPartitionsModified.toString, "--topic", testTopicName))
topicService.alterTopic(alterOpts)
- val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName)
- assertTrue(newProps.containsKey(cleanupKey), "Updated properties do not contain " + cleanupKey)
- assertTrue(newProps.getProperty(cleanupKey).equals(cleanupVal), "Updated properties have incorrect value")
+ val newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
+ assertNotNull(newProps.get(cleanupKey), "Updated properties do not contain " + cleanupKey)
+ assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated properties have incorrect value")
}
- @Test
- def testTopicDeletion(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTopicDeletion(quorum: String): Unit = {
// create the NormalTopic
val createOpts = new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1",
@@ -423,14 +460,17 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
- val deletePath = DeleteTopicsTopicZNode.path(testTopicName)
- assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.")
+ if (!isKRaftTest()) {
+ val deletePath = DeleteTopicsTopicZNode.path(testTopicName)
+ assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.")
+ }
topicService.deleteTopic(deleteOpts)
- TestUtils.verifyTopicDeletion(zkClient, testTopicName, 1, servers)
+ TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers)
}
- @Test
- def testDeleteInternalTopic(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteInternalTopic(quorum: String): Unit = {
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1",
@@ -443,25 +483,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
val deleteOffsetTopicOpts = new TopicCommandOptions(
Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
val deleteOffsetTopicPath = DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME)
- assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion.")
+ if (!isKRaftTest()) {
+ assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion.")
+ }
topicService.deleteTopic(deleteOffsetTopicOpts)
- TestUtils.verifyTopicDeletion(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 1, servers)
+ TestUtils.verifyTopicDeletion(zkClientOrNull, Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers)
}
- @Test
- def testDeleteWhenTopicDoesntExist(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteWhenTopicDoesntExist(quorum: String): Unit = {
// delete a topic that does not exist
val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
assertThrows(classOf[IllegalArgumentException], () => topicService.deleteTopic(deleteOpts))
}
- @Test
- def testDeleteWhenTopicDoesntExistWithIfExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.deleteTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
}
- @Test
- def testDescribe(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribe(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get()
waitForTopicCreated(testTopicName)
@@ -473,19 +518,22 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
}
- @Test
- def testDescribeWhenTopicDoesntExist(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWhenTopicDoesntExist(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
}
- @Test
- def testDescribeWhenTopicDoesntExistWithIfExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
}
- @Test
- def testDescribeUnavailablePartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnavailablePartitions(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 6, 1.toShort))).all().get()
waitForTopicCreated(testTopicName)
@@ -500,7 +548,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
// wait until the topic metadata for the test topic is propagated to each alive broker
TestUtils.waitUntilTrue(() => {
- servers
+ brokers
.filterNot(_.config.brokerId == 0)
.foldLeft(true) {
(result, server) => {
@@ -527,15 +575,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeUnderReplicatedPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 1, 6.toShort))).all().get()
waitForTopicCreated(testTopicName)
try {
killBroker(0)
- val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ val aliveServers = brokers.filterNot(_.config.brokerId == 0)
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
@@ -546,8 +595,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeUnderMinIsrPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnderMinIsrPartitions(quorum: String): Unit = {
val configMap = new java.util.HashMap[String, String]()
configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6")
@@ -557,7 +607,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
try {
killBroker(0)
- val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ val aliveServers = brokers.filterNot(_.config.brokerId == 0)
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
@@ -568,8 +618,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum: String): Unit = {
val configMap = new java.util.HashMap[String, String]()
val replicationFactor: Short = 1
val partitions = 1
@@ -580,12 +631,12 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
waitForTopicCreated(testTopicName)
// Produce multiple batches.
- TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
- TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
+ TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1)
+ TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1)
// Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication
// throughput so the reassignment doesn't complete quickly.
- val brokerIds = servers.map(_.config.brokerId)
+ val brokerIds = brokers.map(_.config.brokerId)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1)
val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName)
@@ -622,8 +673,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
TestUtils.waitForAllReassignmentsToComplete(adminClient)
}
- @Test
- def testDescribeAtMinIsrPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAtMinIsrPartitions(quorum: String): Unit = {
val configMap = new java.util.HashMap[String, String]()
configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
@@ -653,8 +705,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
*
* Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition
*/
- @Test
- def testDescribeUnderMinIsrPartitionsMixed(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnderMinIsrPartitionsMixed(quorum: String): Unit = {
val underMinIsrTopic = "under-min-isr-topic"
val notUnderMinIsrTopic = "not-under-min-isr-topic"
val offlineTopic = "offline-topic"
@@ -677,7 +730,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
try {
killBroker(0)
- val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ val aliveServers = brokers.filterNot(_.config.brokerId == 0)
TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
@@ -690,8 +743,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeReportOverriddenConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeReportOverriddenConfigs(quorum: String): Unit = {
val config = "file.delete.delay.ms=1000"
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", config)))
@@ -700,8 +754,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertTrue(output.contains(config), s"Describe output should have contained $config")
}
- @Test
- def testDescribeAndListTopicsWithoutInternalTopics(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAndListTopicsWithoutInternalTopics(quorum: String): Unit = {
createAndWaitTopic(
new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
// create a internal topic
@@ -720,8 +775,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
}
- @Test
- def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(quorum: String): Unit = {
adminClient = spy(adminClient)
topicService = TopicService(adminClient)
@@ -746,8 +802,20 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
}
- @Test
- def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithTopicNameCollision(quorum: String): Unit = {
+ adminClient.createTopics(
+ Collections.singletonList(new NewTopic("foo_bar", 1, 6.toShort))).all().get()
+ waitForTopicCreated("foo_bar")
+
+ assertThrows(classOf[InvalidTopicException],
+ () => topicService.createTopic(new TopicCommandOptions(Array("--topic", "foo.bar"))))
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
@@ -766,8 +834,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
)
}
- @Test
- def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
@@ -787,8 +856,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
)
}
- @Test
- def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d8005e60ce..f104364d1a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -216,6 +216,23 @@ public class ReplicationControlManager {
*/
private final TimelineHashMap<String, Uuid> topicsByName;
+ /**
+ * We try to prevent topics from being created if their names would collide with
+ * existing topics when periods in the topic name are replaced with underscores.
+ * The reason for this is that some per-topic metrics do replace periods with
+ * underscores, and would therefore be ambiguous otherwise.
+ *
+ * This map is from normalized topic name to a set of topic names. So if we had two
+ * topics named foo.bar and foo_bar this map would contain
+ * a mapping from foo_bar to a set containing foo.bar and foo_bar.
+ *
+ * Since we reject topic creations that would collide, under normal conditions the
+ * sets in this map should only have a size of 1. However, if the cluster was
+ * upgraded from a version prior to KAFKA-13743, it may be possible to have more
+ * values here, since collidiing topic names will be "grandfathered in."
+ */
+ private final TimelineHashMap<String, TimelineHashSet<String>> topicsWithCollisionChars;
+
/**
* Maps topic UUIDs to structures containing topic information, including partitions.
*/
@@ -258,6 +275,7 @@ public class ReplicationControlManager {
this.clusterControl = clusterControl;
this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.topicsWithCollisionChars = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -266,6 +284,15 @@ public class ReplicationControlManager {
public void replay(TopicRecord record) {
topicsByName.put(record.name(), record.topicId());
+ if (Topic.hasCollisionChars(record.name())) {
+ String normalizedName = Topic.unifyCollisionChars(record.name());
+ TimelineHashSet<String> topicNames = topicsWithCollisionChars.get(normalizedName);
+ if (topicNames == null) {
+ topicNames = new TimelineHashSet<>(snapshotRegistry, 1);
+ topicsWithCollisionChars.put(normalizedName, topicNames);
+ }
+ topicNames.add(record.name());
+ }
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
controllerMetrics.setGlobalTopicsCount(topics.size());
@@ -374,6 +401,16 @@ public class ReplicationControlManager {
" to remove.");
}
topicsByName.remove(topic.name);
+ if (Topic.hasCollisionChars(topic.name)) {
+ String normalizedName = Topic.unifyCollisionChars(topic.name);
+ TimelineHashSet<String> colliding = topicsWithCollisionChars.get(normalizedName);
+ if (colliding != null) {
+ colliding.remove(topic.name);
+ if (colliding.isEmpty()) {
+ topicsWithCollisionChars.remove(topic.name);
+ }
+ }
+ }
reassigningTopics.remove(record.topicId());
// Delete the configurations associated with this topic.
@@ -407,7 +444,7 @@ public class ReplicationControlManager {
List<ApiMessageAndVersion> records = new ArrayList<>();
// Check the topic names.
- validateNewTopicNames(topicErrors, request.topics());
+ validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars);
// Identify topics that already exist and mark them with the appropriate error
request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name()))
@@ -598,7 +635,8 @@ public class ReplicationControlManager {
}
static void validateNewTopicNames(Map<String, ApiError> topicErrors,
- CreatableTopicCollection topics) {
+ CreatableTopicCollection topics,
+ Map<String, ? extends Set<String>> topicsWithCollisionChars) {
for (CreatableTopic topic : topics) {
if (topicErrors.containsKey(topic.name())) continue;
try {
@@ -607,6 +645,15 @@ public class ReplicationControlManager {
topicErrors.put(topic.name(),
new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage()));
}
+ if (Topic.hasCollisionChars(topic.name())) {
+ String normalizedName = Topic.unifyCollisionChars(topic.name());
+ Set<String> colliding = topicsWithCollisionChars.get(normalizedName);
+ if (colliding != null) {
+ topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION,
+ "Topic '" + topic.name() + "' collides with existing topic: " +
+ colliding.iterator().next()));
+ }
+ }
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 108f2eca66..d095a9fe37 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
import java.util.Collections;
@@ -641,7 +642,7 @@ public class ReplicationControlManagerTest {
topics.add(new CreatableTopic().setName(""));
topics.add(new CreatableTopic().setName("woo"));
topics.add(new CreatableTopic().setName("."));
- ReplicationControlManager.validateNewTopicNames(topicErrors, topics);
+ ReplicationControlManager.validateNewTopicNames(topicErrors, topics, Collections.emptyMap());
Map<String, ApiError> expectedTopicErrors = new HashMap<>();
expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION,
"Topic name is illegal, it can't be empty"));
@@ -650,6 +651,24 @@ public class ReplicationControlManagerTest {
assertEquals(expectedTopicErrors, topicErrors);
}
+ @Test
+ public void testTopicNameCollision() {
+ Map<String, ApiError> topicErrors = new HashMap<>();
+ CreatableTopicCollection topics = new CreatableTopicCollection();
+ topics.add(new CreatableTopic().setName("foo.bar"));
+ topics.add(new CreatableTopic().setName("woo.bar_foo"));
+ Map<String, Set<String>> collisionMap = new HashMap<>();
+ collisionMap.put("foo_bar", new TreeSet<>(Arrays.asList("foo_bar")));
+ collisionMap.put("woo_bar_foo", new TreeSet<>(Arrays.asList("woo.bar.foo", "woo_bar.foo")));
+ ReplicationControlManager.validateNewTopicNames(topicErrors, topics, collisionMap);
+ Map<String, ApiError> expectedTopicErrors = new HashMap<>();
+ expectedTopicErrors.put("foo.bar", new ApiError(INVALID_TOPIC_EXCEPTION,
+ "Topic 'foo.bar' collides with existing topic: foo_bar"));
+ expectedTopicErrors.put("woo.bar_foo", new ApiError(INVALID_TOPIC_EXCEPTION,
+ "Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo"));
+ assertEquals(expectedTopicErrors, topicErrors);
+ }
+
@Test
public void testRemoveLeaderships() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();