You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/21 23:49:07 UTC
[kafka] branch 1.1 updated: KAFKA-7082: Concurrent create topics
may throw NodeExistsException (#5259)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new ae3dd56 KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)
ae3dd56 is described below
commit ae3dd56682b6b58409563b477b3bf19aaeb4d353
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Jun 21 16:47:44 2018 -0700
KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)
This is an unexpected exception so `UnknownServerException`
is thrown back to the client.
This is a minimal change to make the behaviour match `ZkUtils`.
This is better, but one could argue that it's not perfect. A more
sophisticated approach can be tackled separately.
Added a concurrent test that fails without this change.
Reviewers: Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/zk/AdminZkClient.scala | 1 -
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 15 ++++++++++---
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 26 +++++++++++++++++++++-
3 files changed, 37 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 6d7df3f..ffe3cd2 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -93,7 +93,6 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
update: Boolean = false) {
validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
- // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index a7037c7..a7ba5ac 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -246,6 +246,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
/**
* Sets or creates the entity znode path with the given configs depending
* on whether it already exists or not.
+ *
+ * If this is method is called concurrently, the last writer wins. In cases where we update configs and then
+ * partition assignment (i.e. create topic), it's possible for one thread to set this and the other to set the
+ * partition assignment. As such, the recommendation is to never call create topic for the same topic with different
+ * configs/partition assignment concurrently.
+ *
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating the znode
@@ -257,16 +263,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
retryRequestUntilConnected(setDataRequest)
}
- def create(configData: Array[Byte]) = {
+ def createOrSet(configData: Array[Byte]): Unit = {
val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
- createRecursive(path, ConfigEntityZNode.encode(config))
+ try createRecursive(path, ConfigEntityZNode.encode(config))
+ catch {
+ case _: NodeExistsException => set(configData).maybeThrow
+ }
}
val configData = ConfigEntityZNode.encode(config)
val setDataResponse = set(configData)
setDataResponse.resultCode match {
- case Code.NONODE => create(configData)
+ case Code.NONODE => createOrSet(configData)
case _ => setDataResponse.maybeThrow
}
}
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index fe5fbff..39745e5 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -28,8 +28,10 @@ import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Test}
@@ -132,7 +134,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
}
@Test
- def testConcurrentTopicCreation() {
+ def testMockedConcurrentTopicCreation() {
val topic = "test.topic"
// simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
@@ -147,6 +149,28 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
}
}
+ @Test
+ def testConcurrentTopicCreation() {
+ val topic = "test-concurrent-topic-creation"
+ TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+ val props = new Properties
+ props.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
+ def createTopic(): Unit = {
+ try adminZkClient.createTopic(topic, 3, 1, props)
+ catch { case _: TopicExistsException => () }
+ val (_, partitionAssignment) = zkClient.getPartitionAssignmentForTopics(Set(topic)).head
+ assertEquals(3, partitionAssignment.size)
+ partitionAssignment.foreach { case (partition, replicas) =>
+ assertEquals(s"Unexpected replication factor for $partition", 1, replicas.size)
+ }
+ val savedProps = zkClient.getEntityConfigs(ConfigType.Topic, topic)
+ assertEquals(props, savedProps)
+ }
+
+ TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(createTopic, createTopic),
+ JTestUtils.DEFAULT_MAX_WAIT_MS.toInt)
+ }
+
/**
* This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
* then changes the config and checks that the new values take effect.