You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/16 02:30:44 UTC

[kafka] branch trunk updated: MINOR: Remove redundant forwarding integration tests (#11766)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b765a2b  MINOR: Remove redundant forwarding integration tests (#11766)
b765a2b is described below

commit b765a2b44e9e7c7fd662cb7e0ed7d964a0ddeeed
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Feb 15 18:28:34 2022 -0800

    MINOR: Remove redundant forwarding integration tests (#11766)
    
    There are a few integration tests for the forwarding logic which were added prior to kraft being ready for integration testing. Now that we have enabled kraft in integration tests, these tests are redundant and can be removed.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>
---
 .../main/scala/kafka/server/ControllerServer.scala |   2 +-
 .../kafka/api/ConsumerTopicCreationTest.scala      |  18 ----
 .../kafka/integration/KafkaServerTestHarness.scala |   4 +-
 .../CreateTopicsRequestWithForwardingTest.scala    |  39 --------
 .../server/MetadataRequestWithForwardingTest.scala | 111 ---------------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   6 +-
 6 files changed, 5 insertions(+), 175 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 9e46514..f1ee345 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -94,7 +94,7 @@ class ControllerServer(
     true
   }
 
-  def clusterId: String = metaProperties.clusterId.toString
+  def clusterId: String = metaProperties.clusterId
 
   def startup(): Unit = {
     if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
index b4fb1e8..86de9a5 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
@@ -22,14 +22,12 @@ import java.time.Duration
 import java.util
 import java.util.Collections
 
-import kafka.api
 import kafka.server.KafkaConfig
 import kafka.utils.{EmptyTestInfo, TestUtils}
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
@@ -46,26 +44,10 @@ class ConsumerTopicCreationTest {
     try testCase.test() finally testCase.tearDown()
   }
 
-  @Disabled("Enable after enable KIP-590 forwarding in KAFKA-12886")
-  @ParameterizedTest
-  @MethodSource(Array("parameters"))
-  def testAutoTopicCreationWithForwarding(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean): Unit = {
-    val testCase = new api.ConsumerTopicCreationTest.TestCaseWithForwarding(brokerAutoTopicCreationEnable, consumerAllowAutoCreateTopics)
-    testCase.setUp(new EmptyTestInfo())
-    try testCase.test() finally testCase.tearDown()
-  }
 }
 
 object ConsumerTopicCreationTest {
 
-  private class TestCaseWithForwarding(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean)
-    extends TestCase(brokerAutoTopicCreationEnable, consumerAllowAutoCreateTopics) {
-
-    override protected def brokerCount: Int = 3
-
-    override def enableForwarding: Boolean = true
-  }
-
   private class TestCase(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness {
     private val topic_1 = "topic-1"
     private val topic_2 = "topic-2"
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index f45b99d..393d4b8c0 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -102,7 +102,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
   protected def serverSaslProperties: Option[Properties] = None
   protected def clientSaslProperties: Option[Properties] = None
   protected def brokerTime(brokerId: Int): Time = Time.SYSTEM
-  protected def enableForwarding: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
@@ -290,7 +289,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     brokerList = if (startup) TestUtils.bootstrapServers(_brokers, listenerName) else null
   }
 
-  private def createBrokerFromConfig(config: KafkaConfig) = {
+  private def createBrokerFromConfig(config: KafkaConfig): KafkaBroker = {
     if (isKRaftTest()) {
       createBroker(config, brokerTime(config.brokerId), startup = false)
     } else {
@@ -298,7 +297,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
         config,
         time = brokerTime(config.brokerId),
         threadNamePrefix = None,
-        enableForwarding,
         startup = false
       )
     }
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
deleted file mode 100644
index cbfdf72e..0000000
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.protocol.Errors
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-import scala.jdk.CollectionConverters._
-
-class CreateTopicsRequestWithForwardingTest extends AbstractCreateTopicsRequestTest {
-
-  override def enableForwarding: Boolean = true
-
-  @ParameterizedTest
-  @ValueSource(strings = Array("kraft"))
-  def testForwardToController(quorum: String): Unit = {
-    val req = topicsReq(Seq(topicReq("topic1")))
-    val response = sendCreateTopicRequest(req, notControllerSocketServer)
-    // With forwarding enabled, request could be forwarded to the active controller.
-    assertEquals(Map(Errors.NONE -> 1), response.errorCounts().asScala)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
deleted file mode 100644
index 3580e2b..0000000
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.TestUtils
-import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.MetadataRequest
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-
-class MetadataRequestWithForwardingTest extends AbstractMetadataRequestTest {
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    doSetup(testInfo, createOffsetsTopic = false)
-  }
-
-  override def enableForwarding: Boolean = true
-
-  @Test
-  def testAutoTopicCreation(): Unit = {
-    val topic1 = "t1"
-    val topic2 = "t2"
-    val topic3 = "t3"
-    val topic4 = "t4"
-    val topic5 = "t5"
-    createTopic(topic1)
-
-    val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
-    assertNull(response1.errors.get(topic1))
-    checkAutoCreatedTopic(topic2, response1)
-
-    // The default behavior in old versions of the metadata API is to allow topic creation, so
-    // protocol downgrades should happen gracefully when auto-creation is explicitly requested.
-    val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
-    checkAutoCreatedTopic(topic3, response2)
-
-    // V3 doesn't support a configurable allowAutoTopicCreation, so disabling auto-creation is not supported
-    assertThrows(classOf[UnsupportedVersionException], () => sendMetadataRequest(new MetadataRequest(requestData(List(topic4), false), 3.toShort)))
-
-    // V4 and higher support a configurable allowAutoTopicCreation
-    val response3 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4))
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic5))
-    assertEquals(None, zkClient.getTopicPartitionCount(topic5))
-  }
-
-  @Test
-  def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
-    // Shutdown all but one broker so that the number of brokers is less than the default replication factor
-    servers.tail.foreach(_.shutdown())
-    servers.tail.foreach(_.awaitShutdown())
-
-    val topic1 = "testAutoCreateTopic"
-    val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, true).build)
-    assertEquals(1, response1.topicMetadata.size)
-    val topicMetadata = response1.topicMetadata.asScala.head
-    assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicMetadata.error)
-    assertEquals(topic1, topicMetadata.topic)
-    assertEquals(0, topicMetadata.partitionMetadata.size)
-  }
-
-  @Test
-  def testAutoCreateOfCollidingTopics(): Unit = {
-    val topic1 = "testAutoCreate.Topic"
-    val topic2 = "testAutoCreate_Topic"
-    val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build)
-    assertEquals(2, response1.topicMetadata.size)
-
-    val responseMap = response1.topicMetadata.asScala.map(metadata => (metadata.topic(), metadata.error)).toMap
-
-    assertEquals(Set(topic1, topic2), responseMap.keySet)
-    // The topic creation will be delayed, and the name collision error will be swallowed.
-    assertEquals(Set(Errors.LEADER_NOT_AVAILABLE, Errors.INVALID_TOPIC_EXCEPTION), responseMap.values.toSet)
-
-    val topicCreated = responseMap.head._1
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicCreated, 0)
-    TestUtils.waitForPartitionMetadata(servers, topicCreated, 0)
-
-    // retry the metadata for the first auto created topic
-    val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topicCreated).asJava, true).build)
-    val topicMetadata1 = response2.topicMetadata.asScala.head
-    assertEquals(Errors.NONE, topicMetadata1.error)
-    assertEquals(Seq(Errors.NONE), topicMetadata1.partitionMetadata.asScala.map(_.error))
-    assertEquals(1, topicMetadata1.partitionMetadata.size)
-    val partitionMetadata = topicMetadata1.partitionMetadata.asScala.head
-    assertEquals(0, partitionMetadata.partition)
-    assertEquals(2, partitionMetadata.replicaIds.size)
-    assertTrue(partitionMetadata.leaderId.isPresent)
-    assertTrue(partitionMetadata.leaderId.get >= 0)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1ffd6e0..9f9f738 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -170,11 +170,11 @@ object TestUtils extends Logging {
   }
 
   def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String]): KafkaServer = {
-    createServer(config, time, threadNamePrefix, enableForwarding = false, startup = true)
+    createServer(config, time, threadNamePrefix, startup = true)
   }
 
-  def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], enableForwarding: Boolean, startup: Boolean): KafkaServer = {
-    val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding)
+  def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = {
+    val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding = false)
     if (startup) server.startup()
     server
   }