You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/21 23:47:55 UTC

[kafka] branch trunk updated: KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e33ccb6  KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)
e33ccb6 is described below

commit e33ccb628efd9a039d2ad9f9ca74f001bc682fcd
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Jun 21 16:47:44 2018 -0700

    KAFKA-7082: Concurrent create topics may throw NodeExistsException (#5259)
    
    This is an unexpected exception so `UnknownServerException`
    is thrown back to the client.
    
    This is a minimal change to make the behaviour match `ZkUtils`.
    This is better, but one could argue that it's not perfect. A more
    sophisticated approach can be tackled separately.
    
    Added a concurrent test that fails without this change.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  1 -
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 15 ++++++++++---
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    | 26 +++++++++++++++++++++-
 3 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 8a6b3ee..060c0b4 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -93,7 +93,6 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
                                                      update: Boolean = false) {
     validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
 
-    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
     if (!update) {
       // write out the config if there is any, this isn't transactional with the partition assignments
       zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index bb34294..ec4932a 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -246,6 +246,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   /**
    * Sets or creates the entity znode path with the given configs depending
    * on whether it already exists or not.
+   *
+   * If this is method is called concurrently, the last writer wins. In cases where we update configs and then
+   * partition assignment (i.e. create topic), it's possible for one thread to set this and the other to set the
+   * partition assignment. As such, the recommendation is to never call create topic for the same topic with different
+   * configs/partition assignment concurrently.
+   *
    * @param rootEntityType entity type
    * @param sanitizedEntityName entity name
    * @throws KeeperException if there is an error while setting or creating the znode
@@ -257,16 +263,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
       retryRequestUntilConnected(setDataRequest)
     }
 
-    def create(configData: Array[Byte]) = {
+    def createOrSet(configData: Array[Byte]): Unit = {
       val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
-      createRecursive(path, ConfigEntityZNode.encode(config))
+      try createRecursive(path, ConfigEntityZNode.encode(config))
+      catch {
+        case _: NodeExistsException => set(configData).maybeThrow
+      }
     }
 
     val configData = ConfigEntityZNode.encode(config)
 
     val setDataResponse = set(configData)
     setDataResponse.resultCode match {
-      case Code.NONODE => create(configData)
+      case Code.NONODE => createOrSet(configData)
       case _ => setDataResponse.maybeThrow
     }
   }
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index fe5fbff..39745e5 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -28,8 +28,10 @@ import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -132,7 +134,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
   }
 
   @Test
-  def testConcurrentTopicCreation() {
+  def testMockedConcurrentTopicCreation() {
     val topic = "test.topic"
 
     // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
@@ -147,6 +149,28 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
     }
   }
 
+  @Test
+  def testConcurrentTopicCreation() {
+    val topic = "test-concurrent-topic-creation"
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+    val props = new Properties
+    props.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
+    def createTopic(): Unit = {
+      try adminZkClient.createTopic(topic, 3, 1, props)
+      catch { case _: TopicExistsException => () }
+      val (_, partitionAssignment) = zkClient.getPartitionAssignmentForTopics(Set(topic)).head
+      assertEquals(3, partitionAssignment.size)
+      partitionAssignment.foreach { case (partition, replicas) =>
+        assertEquals(s"Unexpected replication factor for $partition", 1, replicas.size)
+      }
+      val savedProps = zkClient.getEntityConfigs(ConfigType.Topic, topic)
+      assertEquals(props, savedProps)
+    }
+
+    TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(createTopic, createTopic),
+      JTestUtils.DEFAULT_MAX_WAIT_MS.toInt)
+  }
+
   /**
    * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
    * then changes the config and checks that the new values take effect.