You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/22 14:53:47 UTC
[kafka] branch 2.0 updated: MINOR: Cleanup threads in integration
tests (#5269)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new f7a9071 MINOR: Cleanup threads in integration tests (#5269)
f7a9071 is described below
commit f7a9071cef0130fc6b26b3e6cf56417d07ded5c8
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jun 22 15:41:52 2018 +0100
MINOR: Cleanup threads in integration tests (#5269)
Leftover threads doing network I/O can interfere with subsequent tests. Add missing shutdown in tests and include admin client in the check for leftover threads.
Reviewers: Anna Povzner <an...@confluent.io>, Dhruvil Shah <dh...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Manikumar Reddy O <ma...@gmail.com>
---
.../kafka/clients/admin/KafkaAdminClient.java | 7 ++++-
.../clients/admin/AdminClientUnitTestEnv.java | 4 +++
.../kafka/clients/admin/MockAdminClient.java | 1 -
.../kafka/api/AdminClientIntegrationTest.scala | 4 +--
.../kafka/api/AuthorizerIntegrationTest.scala | 20 ++++++++----
.../kafka/api/CustomQuotaCallbackTest.scala | 1 +
.../delegation/DelegationTokenManagerTest.scala | 36 ++++++++++++++--------
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 2 ++
8 files changed, 52 insertions(+), 23 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 495095a..62b6b6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -164,6 +164,11 @@ public class KafkaAdminClient extends AdminClient {
*/
private static final long INVALID_SHUTDOWN_TIME = -1;
+ /**
+ * Thread name prefix for admin client network thread
+ */
+ static final String NETWORK_THREAD_PREFIX = "kafka-admin-client-thread";
+
private final Logger log;
/**
@@ -407,7 +412,7 @@ public class KafkaAdminClient extends AdminClient {
this.metrics = metrics;
this.client = client;
this.runnable = new AdminClientRunnable();
- String threadName = "kafka-admin-client-thread | " + clientId;
+ String threadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.thread = new KafkaThread(threadName, runnable, true);
this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
new TimeoutProcessorFactory() : timeoutProcessorFactory;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 3cd807d..e77b48b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -109,4 +109,8 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
}
return map;
}
+
+ public static String kafkaAdminClientNetworkThreadPrefix() {
+ return KafkaAdminClient.NETWORK_THREAD_PREFIX;
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 2fc7048..5175072 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -390,5 +390,4 @@ public class MockAdminClient extends AdminClient {
this.configs = configs != null ? configs : Collections.<String, String>emptyMap();
}
}
-
}
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 037bf85..fe98fda 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1089,7 +1089,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val config = createConfig()
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000")
val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory()
- val client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
+ client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
new CreateTopicsOptions().validateOnly(true)).all()
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
@@ -1105,7 +1105,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testConsumerGroups(): Unit = {
val config = createConfig()
- val client = AdminClient.create(config)
+ client = AdminClient.create(config)
try {
// Verify that initially there are no consumer groups to list.
val list1 = client.listConsumerGroups()
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f4ff8e1..30456c6 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -46,7 +46,6 @@ import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
import org.apache.kafka.test.{TestUtils => JTestUtils}
-
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
@@ -108,6 +107,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val producerConfig = new Properties
val numRecords = 1
+ val adminClients = Buffer[AdminClient]()
+
override def propertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
@@ -261,6 +262,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
producers.foreach(_.close())
consumers.foreach(_.wakeup())
consumers.foreach(_.close())
+ adminClients.foreach(_.close())
removeAllAcls()
super.tearDown()
}
@@ -1005,14 +1007,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test(expected = classOf[GroupAuthorizationException])
def testDescribeGroupApiWithNoGroupAcl() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
- AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
+ createAdminClient().describeConsumerGroup(group)
}
@Test
def testDescribeGroupApiWithGroupDescribe() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
- AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
+ createAdminClient().describeConsumerGroup(group)
}
@Test
@@ -1034,7 +1036,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
- val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+ val result = createAdminClient().deleteConsumerGroups(List(group))
assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
}
@@ -1044,13 +1046,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
- val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+ val result = createAdminClient().deleteConsumerGroups(List(group))
assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
}
@Test
def testDeleteGroupApiWithNoDeleteGroupAcl2() {
- val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+ val result = createAdminClient().deleteConsumerGroups(List(group))
assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
}
@@ -1459,4 +1461,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
producer
}
+ private def createAdminClient(): AdminClient = {
+ val adminClient = AdminClient.createSimplePlaintext(brokerList)
+ adminClients += adminClient
+ adminClient
+ }
+
}
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 7c052f8..eb8f11d 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -89,6 +89,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
producers.clear()
consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
consumers.clear()
+ adminClients.foreach(_.close())
super.tearDown()
}
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index da7a22a..b8d4376 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -26,24 +26,26 @@ import kafka.security.auth.Acl.WildCardHost
import kafka.security.auth._
import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig}
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
-import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
+import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
+import scala.collection.mutable.Buffer
class DelegationTokenManagerTest extends ZooKeeperTestHarness {
val time = new MockTime()
val owner = SecurityUtils.parseKafkaPrincipal("User:owner")
val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer1"))
+ val tokenManagers = Buffer[DelegationTokenManager]()
val masterKey = "masterKey"
val maxLifeTimeMsDefault = Defaults.DelegationTokenMaxLifeTimeMsDefault
@@ -64,11 +66,17 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames())
}
+ @After
+ override def tearDown(): Unit = {
+ tokenManagers.foreach(_.shutdown())
+ super.tearDown()
+ }
+
@Test
def testTokenRequestsWithDelegationTokenDisabled(): Unit = {
val props: Properties = TestUtils.createBrokerConfig(0, zkConnect)
val config = KafkaConfig.fromProps(props)
- val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
tokenManager.createToken(owner, renewer, -1, createTokenResultCallBack)
assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createTokenResult.error)
@@ -84,7 +92,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
@Test
def testCreateToken(): Unit = {
val config = KafkaConfig.fromProps(props)
- val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
tokenManager.startup
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
@@ -101,7 +109,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
@Test
def testRenewToken(): Unit = {
val config = KafkaConfig.fromProps(props)
- val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
tokenManager.startup
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
@@ -149,7 +157,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
@Test
def testExpireToken(): Unit = {
val config = KafkaConfig.fromProps(props)
- val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
tokenManager.startup
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
@@ -201,7 +209,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
var hostSession = new Session(owner1, InetAddress.getByName("192.168.1.1"))
- val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
tokenManager.startup
//create tokens
@@ -281,7 +289,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
@Test
def testPeriodicTokenExpiry(): Unit = {
val config = KafkaConfig.fromProps(props)
- val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
tokenManager.startup
//create tokens
@@ -297,11 +305,6 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
}
- @After
- override def tearDown(): Unit = {
- super.tearDown()
- }
-
private def createTokenResultCallBack(ret: CreateTokenResult): Unit = {
createTokenResult = ret
}
@@ -310,4 +313,11 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
error = ret
expiryTimeStamp = timeStamp
}
+
+ private def createDelegationTokenManager(config: KafkaConfig, tokenCache: DelegationTokenCache,
+ time: Time, zkClient: KafkaZkClient): DelegationTokenManager = {
+ val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+ tokenManagers += tokenManager
+ tokenManager
+ }
}
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 034557e..8d34c48 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -32,6 +32,7 @@ import scala.collection.JavaConverters._
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
+import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
@@ -91,6 +92,7 @@ object ZooKeeperTestHarness {
// which reset static JAAS configuration.
val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName,
KafkaProducer.NETWORK_THREAD_PREFIX,
+ AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
ZkClientEventThreadPrefix)