You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/04 21:35:46 UTC

[kafka] branch trunk updated: MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7cb1600  MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527)
7cb1600 is described below

commit 7cb1600d6ae7245bfa717fa93502d28f17096ea8
Author: THREE LEVEL HELMET <31...@users.noreply.github.com>
AuthorDate: Mon May 4 14:34:23 2020 -0700

    MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527)
    
    Avoid calling into ConfigCommand and TopicCommand from tests that are not related
    to these commands.  It's better to just invoke the admin APIs.
    
    Change a few cases where we were testing the deprecated --zookeeper flag to testing
    the --bootstrap-server flag instead.  Unless we're explicitly testing the deprecated code
    path, we should be using the non-deprecated flags.
    
    Move testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient from
    TopicCommandWithAdminClientTest.scala into TopicCommandWithZKClientTest.scala,
    since it makes more sense in the latter.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../scala/integration/kafka/api/SaslSetup.scala    | 33 +++++++++++++++-------
 .../server/DynamicBrokerReconfigurationTest.scala  | 18 ++++++++----
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   | 16 +++++++----
 .../admin/ReassignPartitionsCommandArgsTest.scala  |  6 ++--
 .../admin/TopicCommandWithAdminClientTest.scala    |  7 -----
 .../kafka/admin/TopicCommandWithZKClientTest.scala | 24 +++++++++++++++-
 6 files changed, 73 insertions(+), 31 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 7b06eb4..542f7e1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -22,17 +22,18 @@ import java.util.Properties
 import javax.security.auth.login.Configuration
 
 import scala.collection.Seq
-
-import kafka.admin.ConfigCommand
 import kafka.security.minikdc.MiniKdc
-import kafka.server.KafkaConfig
+import kafka.server.{ConfigType, KafkaConfig}
 import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule}
 import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.authenticator.LoginManager
-import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
+import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.client.ZKClientConfig
 
 /*
  * Implements an enumeration for the modes enabled here:
@@ -148,12 +149,24 @@ trait SaslSetup {
   }
 
   def createScramCredentials(zkConnect: String, userName: String, password: String): Unit = {
-    val credentials = ScramMechanism.values.map(m => s"${m.mechanismName}=[iterations=4096,password=$password]")
-    val args = Array("--zookeeper", zkConnect,
-      "--alter", "--add-config", credentials.mkString(","),
-      "--entity-type", "users",
-      "--entity-name", userName)
-    ConfigCommand.main(args)
+    val zkClientConfig = new ZKClientConfig()
+    val zkClient = KafkaZkClient(
+      zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
+      Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    val entityType = ConfigType.User
+    val entityName = userName
+    val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
+
+    ScramMechanism.values().foreach(mechanism => {
+      val credential = new ScramFormatter(mechanism).generateCredential(password, 4096)
+      val credentialString = ScramCredentialUtils.credentialToString(credential)
+      configs.setProperty(mechanism.mechanismName, credentialString)
+    })
+
+    adminZkClient.changeConfigs(entityType, entityName, configs)
+    zkClient.close()
   }
 
 }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b610db2..7a809be 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1412,13 +1412,21 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
     zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
 
-    val args = Array("--zookeeper", kafkaConfig.zkConnect,
-      "--alter", "--add-config", sslStoreProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","),
-      "--entity-type", "brokers",
-      "--entity-name", kafkaConfig.brokerId.toString)
-    ConfigCommand.main(args)
+    val entityType = ConfigType.Broker
+    val entityName = kafkaConfig.brokerId.toString
 
+    val passwordConfigs = sslStoreProps.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
     val passwordEncoder = createPasswordEncoder(kafkaConfig, kafkaConfig.passwordEncoderSecret)
+
+    if (passwordConfigs.nonEmpty) {
+      passwordConfigs.foreach { configName =>
+        val encodedValue = passwordEncoder.encode(new Password(sslStoreProps.getProperty(configName)))
+        sslStoreProps.setProperty(configName, encodedValue)
+      }
+    }
+    sslStoreProps.remove(KafkaConfig.PasswordEncoderSecretProp)
+    adminZkClient.changeConfigs(entityType, entityName, sslStoreProps)
+
     val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString)
     assertEquals(4, brokerProps.size)
     assertEquals(sslProperties.get(SSL_KEYSTORE_TYPE_CONFIG),
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index afa571a..d59d942 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -27,13 +27,13 @@ import kafka.utils.TestUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
 import org.junit.{After, Test}
-import kafka.admin.TopicCommand.ZookeeperTopicService
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaAssignment, ReplicaDeletionSuccessful}
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment, NewPartitions}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 import org.scalatest.Assertions.fail
+import scala.jdk.CollectionConverters._
 
 class DeleteTopicTest extends ZooKeeperTestHarness {
 
@@ -227,9 +227,14 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     }, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica")
 
     // increase the partition count for topic
-    val topicCommandOptions = new TopicCommand.TopicCommandOptions(Array("--zookeeper", zkConnect, "--alter", "--topic", topic, "--partitions", "2"))
-    new ZookeeperTopicService(zkClient).alterTopic(topicCommandOptions)
-
+    val props = new Properties()
+    props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
+    val adminClient = Admin.create(props)
+    try {
+      adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get()
+    } catch {
+      case _: ExecutionException =>
+    }
     // trigger a controller switch now
     val previousControllerId = controllerId
 
@@ -246,6 +251,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     follower.startup()
     controller.startup()
     TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+    adminClient.close()
   }
 
 
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
index d15ad82..adeec20 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
@@ -255,7 +255,7 @@ class ReassignPartitionsCommandArgsTest {
   @Test
   def shouldNotAllowBrokersListWithVerifyOption(): Unit = {
     val args = Array(
-      "--zookeeper", "localhost:1234",
+      "--bootstrap-server", "localhost:1234",
       "--verify",
       "--broker-list", "100,101",
       "--reassignment-json-file", "myfile.json")
@@ -265,7 +265,7 @@ class ReassignPartitionsCommandArgsTest {
   @Test
   def shouldNotAllowThrottleWithVerifyOption(): Unit = {
     val args = Array(
-      "--zookeeper", "localhost:1234",
+      "--bootstrap-server", "localhost:1234",
       "--verify",
       "--throttle", "100",
       "--reassignment-json-file", "myfile.json")
@@ -275,7 +275,7 @@ class ReassignPartitionsCommandArgsTest {
   @Test
   def shouldNotAllowTopicsOptionWithVerify(): Unit = {
     val args = Array(
-      "--zookeeper", "localhost:1234",
+      "--bootstrap-server", "localhost:1234",
       "--verify",
       "--reassignment-json-file", "myfile.json",
       "--topics-to-move-json-file", "myfile.json")
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 409b76b..04712d6 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -294,13 +294,6 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
   }
 
   @Test
-  def testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient(): Unit = {
-    assertExitCode(1, () =>
-      new TopicCommandOptions(Array("--create", "--zookeeper", "zk", "--topic", testTopicName)).checkArgs()
-    )
-  }
-
-  @Test
   def testInvalidTopicLevelConfig(): Unit = {
     val createOpts = new TopicCommandOptions(
       Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName,
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala
index 64233b5..3568cd9 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala
@@ -18,7 +18,7 @@ package kafka.admin
 
 import kafka.admin.TopicCommand.{TopicCommandOptions, ZookeeperTopicService}
 import kafka.server.ConfigType
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Exit, Logging, TestUtils}
 import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode, ZooKeeperTestHarness}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
@@ -597,4 +597,26 @@ class TopicCommandWithZKClientTest extends ZooKeeperTestHarness with Logging wit
     expectAlterInternalTopicPartitionCountFailed(Topic.GROUP_METADATA_TOPIC_NAME)
     expectAlterInternalTopicPartitionCountFailed(Topic.TRANSACTION_STATE_TOPIC_NAME)
   }
+
+  @Test
+  def testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient(): Unit = {
+    assertExitCode(1, () =>
+      new TopicCommandOptions(Array("--create", "--zookeeper", "zk", "--topic", testTopicName)).checkArgs()
+    )
+  }
+
+  def assertExitCode(expected: Int, method: () => Unit): Unit = {
+    def mockExitProcedure(exitCode: Int, exitMessage: Option[String]): Nothing = {
+      assertEquals(expected, exitCode)
+      throw new RuntimeException
+    }
+    Exit.setExitProcedure(mockExitProcedure)
+    try {
+      intercept[RuntimeException] {
+        method()
+      }
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
 }