You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dajac (via GitHub)" <gi...@apache.org> on 2023/06/13 08:30:13 UTC

[GitHub] [kafka] dajac opened a new pull request, #13848: MINOR: Use admin client to create offsets topic

dajac opened a new pull request, #13848:
URL: https://github.com/apache/kafka/pull/13848

   I have see failures like the following ones in a few builds:
   
   ```
   Build / JDK 11 and Scala 2.13 / testDescribeSimpleConsumerGroup() – kafka.admin.DescribeConsumerGroupTest
   org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists.
   ```
   
   Many tests still use `TestUtils.createOffsetsTopic(zkClient, servers)` to create the offsets topic. This method does not handle the case where the topic exists (e.g. in the case of a retry). Instead of fixing it, I think that it is just better to use the admin client variant which is more reliable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1231007534


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -611,11 +611,33 @@ object TestUtils extends Logging {
     */
   def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
-      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
-      server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      server.groupCoordinator.groupMetadataTopicConfigs)
+    val numPartitions = server.config.offsetsTopicPartitions
+    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+
+    try {
+      createTopic(
+        zkClient,
+        Topic.GROUP_METADATA_TOPIC_NAME,
+        numPartitions,
+        replicationFactor,
+        servers,
+        server.groupCoordinator.groupMetadataTopicConfigs
+      )
+    } catch {
+      case ex: TopicExistsException =>
+        val allPartitionsMetadata = waitForAllPartitionsMetadata(
+          servers,
+          Topic.GROUP_METADATA_TOPIC_NAME,
+          numPartitions
+        )
+
+        // If the topic already exists, we ensure that it has the required
+        // number of partitions and replication factor. If it has not, the
+        // exception is thrown further.
+        if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {

Review Comment:
   We may be using the same for KRaft mode but do you agree that this approach of re-using existing topic artifacts is brittle? Even if this hasn't caused problems until today, it could cause problems in future. So, while we are fixing it, why not fix it in an ideal manner. The cost of checking the end offset is very small! We just have to ensure that it is zero.
   
   Alternatively, if you wish to keep the behaviour consistent with kraft version, could you please update the java doc for this method and mention that if a topic already exists with same partitions, we may retain the data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13848:
URL: https://github.com/apache/kafka/pull/13848#issuecomment-1596594750

   > Btw, @dajac do we typically autocreate this topic when we are not testing? Just curious because I've seen some issues with __transaction_state not having enough replicas in tests (taking 10 or so seconds to get a valid isr) Just wondering if pre-creating the topic is a valid approach for tests.
   
   @jolshan Correct. The topic is automatically created when it is needed. My understanding is that we pre-create it in tests in order to reduce flakyness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1230801748


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -611,11 +611,33 @@ object TestUtils extends Logging {
     */
   def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
-      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
-      server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      server.groupCoordinator.groupMetadataTopicConfigs)
+    val numPartitions = server.config.offsetsTopicPartitions
+    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+
+    try {
+      createTopic(
+        zkClient,
+        Topic.GROUP_METADATA_TOPIC_NAME,
+        numPartitions,
+        replicationFactor,
+        servers,
+        server.groupCoordinator.groupMetadataTopicConfigs
+      )
+    } catch {
+      case ex: TopicExistsException =>
+        val allPartitionsMetadata = waitForAllPartitionsMetadata(
+          servers,
+          Topic.GROUP_METADATA_TOPIC_NAME,
+          numPartitions
+        )
+
+        // If the topic already exists, we ensure that it has the required
+        // number of partitions and replication factor. If it has not, the
+        // exception is thrown further.
+        if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {

Review Comment:
   Is this sufficient validation? The topics might be leaked artifacts from other tests / retries and might contain different data than what is expected. So, we should probably validate that all partitions are empty as well.
   
   As an alternative solution, can we delete the topic if it already exists here and re-create it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Use admin client to create offsets topic in tests

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1228424644


##########
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##########
@@ -150,12 +150,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     listenerName: ListenerName = listenerName,
     adminClientConfig: Properties = new Properties
   ): Unit = {
-    if (isKRaftTest()) {
-      resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
-        TestUtils.createOffsetsTopicWithAdmin(admin, brokers)
-      }
-    } else {
-      TestUtils.createOffsetsTopic(zkClient, servers)

Review Comment:
   Ouch. Many tests have failed without this. I will investigate...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13848:
URL: https://github.com/apache/kafka/pull/13848


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13848:
URL: https://github.com/apache/kafka/pull/13848#issuecomment-1595350950

   Btw, @dajac do we typically autocreate this topic? Just curious because I've seen some issues with __transaction_state not having enough replicas in tests (taking 10 or so seconds to get a valid isr) Just wondering if pre-creating the topic is a valid approach for tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Use admin client to create offsets topic in tests

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1227742933


##########
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##########
@@ -150,12 +150,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     listenerName: ListenerName = listenerName,
     adminClientConfig: Properties = new Properties
   ): Unit = {
-    if (isKRaftTest()) {
-      resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
-        TestUtils.createOffsetsTopicWithAdmin(admin, brokers)
-      }
-    } else {
-      TestUtils.createOffsetsTopic(zkClient, servers)

Review Comment:
   Is there a reason why we kept using the zk client here? It seems much better to use the admin client all the time unless explicitly stated otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1231056163


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -611,11 +611,33 @@ object TestUtils extends Logging {
     */
   def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
-      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
-      server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      server.groupCoordinator.groupMetadataTopicConfigs)
+    val numPartitions = server.config.offsetsTopicPartitions
+    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+
+    try {
+      createTopic(
+        zkClient,
+        Topic.GROUP_METADATA_TOPIC_NAME,
+        numPartitions,
+        replicationFactor,
+        servers,
+        server.groupCoordinator.groupMetadataTopicConfigs
+      )
+    } catch {
+      case ex: TopicExistsException =>
+        val allPartitionsMetadata = waitForAllPartitionsMetadata(
+          servers,
+          Topic.GROUP_METADATA_TOPIC_NAME,
+          numPartitions
+        )
+
+        // If the topic already exists, we ensure that it has the required
+        // number of partitions and replication factor. If it has not, the
+        // exception is thrown further.
+        if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {

Review Comment:
   I disagree because we only use this method when we create a new cluster. I could be brittle if we were using it differently. I think that you could always argue that you just want this method to ensure that the topic is present regardless of whether it has already been used or not. I will extend the javadoc to make this clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1231061065


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -611,11 +611,33 @@ object TestUtils extends Logging {
     */
   def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
-      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
-      server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      server.groupCoordinator.groupMetadataTopicConfigs)
+    val numPartitions = server.config.offsetsTopicPartitions
+    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+
+    try {
+      createTopic(
+        zkClient,
+        Topic.GROUP_METADATA_TOPIC_NAME,
+        numPartitions,
+        replicationFactor,
+        servers,
+        server.groupCoordinator.groupMetadataTopicConfigs
+      )
+    } catch {
+      case ex: TopicExistsException =>
+        val allPartitionsMetadata = waitForAllPartitionsMetadata(
+          servers,
+          Topic.GROUP_METADATA_TOPIC_NAME,
+          numPartitions
+        )
+
+        // If the topic already exists, we ensure that it has the required
+        // number of partitions and replication factor. If it has not, the
+        // exception is thrown further.
+        if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1230916047


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -611,11 +611,33 @@ object TestUtils extends Logging {
     */
   def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
-      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
-      server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      server.groupCoordinator.groupMetadataTopicConfigs)
+    val numPartitions = server.config.offsetsTopicPartitions
+    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+
+    try {
+      createTopic(
+        zkClient,
+        Topic.GROUP_METADATA_TOPIC_NAME,
+        numPartitions,
+        replicationFactor,
+        servers,
+        server.groupCoordinator.groupMetadataTopicConfigs
+      )
+    } catch {
+      case ex: TopicExistsException =>
+        val allPartitionsMetadata = waitForAllPartitionsMetadata(
+          servers,
+          Topic.GROUP_METADATA_TOPIC_NAME,
+          numPartitions
+        )
+
+        // If the topic already exists, we ensure that it has the required
+        // number of partitions and replication factor. If it has not, the
+        // exception is thrown further.
+        if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {

Review Comment:
   We do the same when we create it with the admin client in KRaft mode and it has been enough until today. The cluster is usually recreated for all tests so it can't be leaked from a previous one. This error only happens when the first creation fails for instance due to an network exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1231056163


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -611,11 +611,33 @@ object TestUtils extends Logging {
     */
   def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
-      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
-      server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      server.groupCoordinator.groupMetadataTopicConfigs)
+    val numPartitions = server.config.offsetsTopicPartitions
+    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+
+    try {
+      createTopic(
+        zkClient,
+        Topic.GROUP_METADATA_TOPIC_NAME,
+        numPartitions,
+        replicationFactor,
+        servers,
+        server.groupCoordinator.groupMetadataTopicConfigs
+      )
+    } catch {
+      case ex: TopicExistsException =>
+        val allPartitionsMetadata = waitForAllPartitionsMetadata(
+          servers,
+          Topic.GROUP_METADATA_TOPIC_NAME,
+          numPartitions
+        )
+
+        // If the topic already exists, we ensure that it has the required
+        // number of partitions and replication factor. If it has not, the
+        // exception is thrown further.
+        if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {

Review Comment:
   I disagree because we only use this method when we create a new cluster. However, I agree that it could be brittle if we were using it differently. I think that you could always argue that you just want this method to ensure that the topic is present regardless of whether it has already been used or not. I will extend the javadoc to make this clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13848:
URL: https://github.com/apache/kafka/pull/13848#discussion_r1230916047


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -611,11 +611,33 @@ object TestUtils extends Logging {
     */
   def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
-    createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
-      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
-      server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      server.groupCoordinator.groupMetadataTopicConfigs)
+    val numPartitions = server.config.offsetsTopicPartitions
+    val replicationFactor = server.config.offsetsTopicReplicationFactor.toInt
+
+    try {
+      createTopic(
+        zkClient,
+        Topic.GROUP_METADATA_TOPIC_NAME,
+        numPartitions,
+        replicationFactor,
+        servers,
+        server.groupCoordinator.groupMetadataTopicConfigs
+      )
+    } catch {
+      case ex: TopicExistsException =>
+        val allPartitionsMetadata = waitForAllPartitionsMetadata(
+          servers,
+          Topic.GROUP_METADATA_TOPIC_NAME,
+          numPartitions
+        )
+
+        // If the topic already exists, we ensure that it has the required
+        // number of partitions and replication factor. If it has not, the
+        // exception is thrown further.
+        if (allPartitionsMetadata.size != numPartitions || allPartitionsMetadata.head._2.replicas.size != replicationFactor) {

Review Comment:
   We do the same when we create it with the admin client in KRaft mode and I has been enough until today. The cluster is usually recreated for all tests so it can't be leaked from a previous one. This error only happens when the first creation fails for instance due to an network exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org