You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/04 21:35:46 UTC
[kafka] branch trunk updated: MINOR: Clean up some test
dependencies on ConfigCommand and TopicCommand (#8527)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7cb1600 MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527)
7cb1600 is described below
commit 7cb1600d6ae7245bfa717fa93502d28f17096ea8
Author: THREE LEVEL HELMET <31...@users.noreply.github.com>
AuthorDate: Mon May 4 14:34:23 2020 -0700
MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527)
Avoid calling into ConfigCommand and TopicCommand from tests that are not related
to these commands. It's better to just invoke the admin APIs.
Change a few cases where we were testing the deprecated --zookeeper flag to testing
the --bootstrap-server flag instead. Unless we're explicitly testing the deprecated code
path, we should be using the non-deprecated flags.
Move testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient from
TopicCommandWithAdminClientTest.scala into TopicCommandWithZKClientTest.scala,
since it makes more sense in the latter.
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../scala/integration/kafka/api/SaslSetup.scala | 33 +++++++++++++++-------
.../server/DynamicBrokerReconfigurationTest.scala | 18 ++++++++----
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 16 +++++++----
.../admin/ReassignPartitionsCommandArgsTest.scala | 6 ++--
.../admin/TopicCommandWithAdminClientTest.scala | 7 -----
.../kafka/admin/TopicCommandWithZKClientTest.scala | 24 +++++++++++++++-
6 files changed, 73 insertions(+), 31 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 7b06eb4..542f7e1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -22,17 +22,18 @@ import java.util.Properties
import javax.security.auth.login.Configuration
import scala.collection.Seq
-
-import kafka.admin.ConfigCommand
import kafka.security.minikdc.MiniKdc
-import kafka.server.KafkaConfig
+import kafka.server.{ConfigType, KafkaConfig}
import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule}
import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.authenticator.LoginManager
-import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
+import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.client.ZKClientConfig
/*
* Implements an enumeration for the modes enabled here:
@@ -148,12 +149,24 @@ trait SaslSetup {
}
def createScramCredentials(zkConnect: String, userName: String, password: String): Unit = {
- val credentials = ScramMechanism.values.map(m => s"${m.mechanismName}=[iterations=4096,password=$password]")
- val args = Array("--zookeeper", zkConnect,
- "--alter", "--add-config", credentials.mkString(","),
- "--entity-type", "users",
- "--entity-name", userName)
- ConfigCommand.main(args)
+ val zkClientConfig = new ZKClientConfig()
+ val zkClient = KafkaZkClient(
+ zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
+ Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+ val adminZkClient = new AdminZkClient(zkClient)
+
+ val entityType = ConfigType.User
+ val entityName = userName
+ val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
+
+ ScramMechanism.values().foreach(mechanism => {
+ val credential = new ScramFormatter(mechanism).generateCredential(password, 4096)
+ val credentialString = ScramCredentialUtils.credentialToString(credential)
+ configs.setProperty(mechanism.mechanismName, credentialString)
+ })
+
+ adminZkClient.changeConfigs(entityType, entityName, configs)
+ zkClient.close()
}
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b610db2..7a809be 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1412,13 +1412,21 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
- val args = Array("--zookeeper", kafkaConfig.zkConnect,
- "--alter", "--add-config", sslStoreProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","),
- "--entity-type", "brokers",
- "--entity-name", kafkaConfig.brokerId.toString)
- ConfigCommand.main(args)
+ val entityType = ConfigType.Broker
+ val entityName = kafkaConfig.brokerId.toString
+ val passwordConfigs = sslStoreProps.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
val passwordEncoder = createPasswordEncoder(kafkaConfig, kafkaConfig.passwordEncoderSecret)
+
+ if (passwordConfigs.nonEmpty) {
+ passwordConfigs.foreach { configName =>
+ val encodedValue = passwordEncoder.encode(new Password(sslStoreProps.getProperty(configName)))
+ sslStoreProps.setProperty(configName, encodedValue)
+ }
+ }
+ sslStoreProps.remove(KafkaConfig.PasswordEncoderSecretProp)
+ adminZkClient.changeConfigs(entityType, entityName, sslStoreProps)
+
val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString)
assertEquals(4, brokerProps.size)
assertEquals(sslProperties.get(SSL_KEYSTORE_TYPE_CONFIG),
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index afa571a..d59d942 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -27,13 +27,13 @@ import kafka.utils.TestUtils
import kafka.server.{KafkaConfig, KafkaServer}
import org.junit.Assert._
import org.junit.{After, Test}
-import kafka.admin.TopicCommand.ZookeeperTopicService
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaAssignment, ReplicaDeletionSuccessful}
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment, NewPartitions}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.scalatest.Assertions.fail
+import scala.jdk.CollectionConverters._
class DeleteTopicTest extends ZooKeeperTestHarness {
@@ -227,9 +227,14 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
}, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica")
// increase the partition count for topic
- val topicCommandOptions = new TopicCommand.TopicCommandOptions(Array("--zookeeper", zkConnect, "--alter", "--topic", topic, "--partitions", "2"))
- new ZookeeperTopicService(zkClient).alterTopic(topicCommandOptions)
-
+ val props = new Properties()
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
+ val adminClient = Admin.create(props)
+ try {
+ adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get()
+ } catch {
+ case _: ExecutionException =>
+ }
// trigger a controller switch now
val previousControllerId = controllerId
@@ -246,6 +251,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
follower.startup()
controller.startup()
TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+ adminClient.close()
}
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
index d15ad82..adeec20 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
@@ -255,7 +255,7 @@ class ReassignPartitionsCommandArgsTest {
@Test
def shouldNotAllowBrokersListWithVerifyOption(): Unit = {
val args = Array(
- "--zookeeper", "localhost:1234",
+ "--bootstrap-server", "localhost:1234",
"--verify",
"--broker-list", "100,101",
"--reassignment-json-file", "myfile.json")
@@ -265,7 +265,7 @@ class ReassignPartitionsCommandArgsTest {
@Test
def shouldNotAllowThrottleWithVerifyOption(): Unit = {
val args = Array(
- "--zookeeper", "localhost:1234",
+ "--bootstrap-server", "localhost:1234",
"--verify",
"--throttle", "100",
"--reassignment-json-file", "myfile.json")
@@ -275,7 +275,7 @@ class ReassignPartitionsCommandArgsTest {
@Test
def shouldNotAllowTopicsOptionWithVerify(): Unit = {
val args = Array(
- "--zookeeper", "localhost:1234",
+ "--bootstrap-server", "localhost:1234",
"--verify",
"--reassignment-json-file", "myfile.json",
"--topics-to-move-json-file", "myfile.json")
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 409b76b..04712d6 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -294,13 +294,6 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
}
@Test
- def testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient(): Unit = {
- assertExitCode(1, () =>
- new TopicCommandOptions(Array("--create", "--zookeeper", "zk", "--topic", testTopicName)).checkArgs()
- )
- }
-
- @Test
def testInvalidTopicLevelConfig(): Unit = {
val createOpts = new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName,
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala
index 64233b5..3568cd9 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithZKClientTest.scala
@@ -18,7 +18,7 @@ package kafka.admin
import kafka.admin.TopicCommand.{TopicCommandOptions, ZookeeperTopicService}
import kafka.server.ConfigType
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Exit, Logging, TestUtils}
import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode, ZooKeeperTestHarness}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
@@ -597,4 +597,26 @@ class TopicCommandWithZKClientTest extends ZooKeeperTestHarness with Logging wit
expectAlterInternalTopicPartitionCountFailed(Topic.GROUP_METADATA_TOPIC_NAME)
expectAlterInternalTopicPartitionCountFailed(Topic.TRANSACTION_STATE_TOPIC_NAME)
}
+
+ @Test
+ def testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient(): Unit = {
+ assertExitCode(1, () =>
+ new TopicCommandOptions(Array("--create", "--zookeeper", "zk", "--topic", testTopicName)).checkArgs()
+ )
+ }
+
+ def assertExitCode(expected: Int, method: () => Unit): Unit = {
+ def mockExitProcedure(exitCode: Int, exitMessage: Option[String]): Nothing = {
+ assertEquals(expected, exitCode)
+ throw new RuntimeException
+ }
+ Exit.setExitProcedure(mockExitProcedure)
+ try {
+ intercept[RuntimeException] {
+ method()
+ }
+ } finally {
+ Exit.resetExitProcedure()
+ }
+ }
}