You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/22 14:53:47 UTC

[kafka] branch 2.0 updated: MINOR: Cleanup threads in integration tests (#5269)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new f7a9071  MINOR: Cleanup threads in integration tests (#5269)
f7a9071 is described below

commit f7a9071cef0130fc6b26b3e6cf56417d07ded5c8
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jun 22 15:41:52 2018 +0100

    MINOR: Cleanup threads in integration tests (#5269)
    
    Leftover threads doing network I/O can interfere with subsequent tests. Add missing shutdown in tests and include admin client in the check for leftover threads.
    
    Reviewers: Anna Povzner <an...@confluent.io>, Dhruvil Shah <dh...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Manikumar Reddy O <ma...@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  7 ++++-
 .../clients/admin/AdminClientUnitTestEnv.java      |  4 +++
 .../kafka/clients/admin/MockAdminClient.java       |  1 -
 .../kafka/api/AdminClientIntegrationTest.scala     |  4 +--
 .../kafka/api/AuthorizerIntegrationTest.scala      | 20 ++++++++----
 .../kafka/api/CustomQuotaCallbackTest.scala        |  1 +
 .../delegation/DelegationTokenManagerTest.scala    | 36 ++++++++++++++--------
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  2 ++
 8 files changed, 52 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 495095a..62b6b6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -164,6 +164,11 @@ public class KafkaAdminClient extends AdminClient {
      */
     private static final long INVALID_SHUTDOWN_TIME = -1;
 
+    /**
+     * Thread name prefix for admin client network thread
+     */
+    static final String NETWORK_THREAD_PREFIX = "kafka-admin-client-thread";
+
     private final Logger log;
 
     /**
@@ -407,7 +412,7 @@ public class KafkaAdminClient extends AdminClient {
         this.metrics = metrics;
         this.client = client;
         this.runnable = new AdminClientRunnable();
-        String threadName = "kafka-admin-client-thread | " + clientId;
+        String threadName = NETWORK_THREAD_PREFIX + " | " + clientId;
         this.thread = new KafkaThread(threadName, runnable, true);
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 3cd807d..e77b48b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -109,4 +109,8 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
         }
         return map;
     }
+
+    public static String kafkaAdminClientNetworkThreadPrefix() {
+        return KafkaAdminClient.NETWORK_THREAD_PREFIX;
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 2fc7048..5175072 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -390,5 +390,4 @@ public class MockAdminClient extends AdminClient {
             this.configs = configs != null ? configs : Collections.<String, String>emptyMap();
         }
     }
-
 }
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 037bf85..fe98fda 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1089,7 +1089,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val config = createConfig()
     config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000")
     val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory()
-    val client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
+    client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
     val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
         new CreateTopicsOptions().validateOnly(true)).all()
     assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
@@ -1105,7 +1105,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   @Test
   def testConsumerGroups(): Unit = {
     val config = createConfig()
-    val client = AdminClient.create(config)
+    client = AdminClient.create(config)
     try {
       // Verify that initially there are no consumer groups to list.
       val list1 = client.listConsumerGroups()
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f4ff8e1..30456c6 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -46,7 +46,6 @@ import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
-
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 
@@ -108,6 +107,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val producerConfig = new Properties
   val numRecords = 1
 
+  val adminClients = Buffer[AdminClient]()
+
   override def propertyOverrides(properties: Properties): Unit = {
     properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
@@ -261,6 +262,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     producers.foreach(_.close())
     consumers.foreach(_.wakeup())
     consumers.foreach(_.close())
+    adminClients.foreach(_.close())
     removeAllAcls()
     super.tearDown()
   }
@@ -1005,14 +1007,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test(expected = classOf[GroupAuthorizationException])
   def testDescribeGroupApiWithNoGroupAcl() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
-    AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
+    createAdminClient().describeConsumerGroup(group)
   }
 
   @Test
   def testDescribeGroupApiWithGroupDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
-    AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
+    createAdminClient().describeConsumerGroup(group)
   }
 
   @Test
@@ -1034,7 +1036,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
-    val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+    val result = createAdminClient().deleteConsumerGroups(List(group))
     assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
   }
 
@@ -1044,13 +1046,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
-    val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+    val result = createAdminClient().deleteConsumerGroups(List(group))
     assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
   }
 
   @Test
   def testDeleteGroupApiWithNoDeleteGroupAcl2() {
-    val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
+    val result = createAdminClient().deleteConsumerGroups(List(group))
     assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
   }
 
@@ -1459,4 +1461,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     producer
   }
 
+  private def createAdminClient(): AdminClient = {
+    val adminClient = AdminClient.createSimplePlaintext(brokerList)
+    adminClients += adminClient
+    adminClient
+  }
+
 }
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 7c052f8..eb8f11d 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -89,6 +89,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
     producers.clear()
     consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
     consumers.clear()
+    adminClients.foreach(_.close())
     super.tearDown()
   }
 
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index da7a22a..b8d4376 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -26,24 +26,26 @@ import kafka.security.auth.Acl.WildCardHost
 import kafka.security.auth._
 import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaConfig}
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
-import org.apache.kafka.common.utils.{MockTime, SecurityUtils}
+import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.Buffer
 
 class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
 
   val time = new MockTime()
   val owner = SecurityUtils.parseKafkaPrincipal("User:owner")
   val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer1"))
+  val tokenManagers = Buffer[DelegationTokenManager]()
 
   val masterKey = "masterKey"
   val maxLifeTimeMsDefault = Defaults.DelegationTokenMaxLifeTimeMsDefault
@@ -64,11 +66,17 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
     tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames())
   }
 
+  @After
+  override def tearDown(): Unit = {
+    tokenManagers.foreach(_.shutdown())
+    super.tearDown()
+  }
+
   @Test
   def testTokenRequestsWithDelegationTokenDisabled(): Unit = {
     val props: Properties = TestUtils.createBrokerConfig(0, zkConnect)
     val config = KafkaConfig.fromProps(props)
-    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
 
     tokenManager.createToken(owner, renewer, -1, createTokenResultCallBack)
     assertEquals(Errors.DELEGATION_TOKEN_AUTH_DISABLED, createTokenResult.error)
@@ -84,7 +92,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
   @Test
   def testCreateToken(): Unit = {
     val config = KafkaConfig.fromProps(props)
-    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
     tokenManager.startup
 
     tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
@@ -101,7 +109,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
   @Test
   def testRenewToken(): Unit = {
     val config = KafkaConfig.fromProps(props)
-    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
     tokenManager.startup
 
     tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
@@ -149,7 +157,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
   @Test
   def testExpireToken(): Unit = {
     val config = KafkaConfig.fromProps(props)
-    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
     tokenManager.startup
 
     tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
@@ -201,7 +209,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
 
     var hostSession = new Session(owner1, InetAddress.getByName("192.168.1.1"))
 
-    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
     tokenManager.startup
 
     //create tokens
@@ -281,7 +289,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
   @Test
   def testPeriodicTokenExpiry(): Unit = {
     val config = KafkaConfig.fromProps(props)
-    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
     tokenManager.startup
 
     //create tokens
@@ -297,11 +305,6 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
 
   }
 
-  @After
-  override def tearDown(): Unit = {
-    super.tearDown()
-  }
-
   private def createTokenResultCallBack(ret: CreateTokenResult): Unit = {
     createTokenResult = ret
   }
@@ -310,4 +313,11 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
     error = ret
     expiryTimeStamp = timeStamp
   }
+
+  private def createDelegationTokenManager(config: KafkaConfig, tokenCache: DelegationTokenCache,
+                                           time: Time, zkClient: KafkaZkClient): DelegationTokenManager = {
+    val tokenManager = new DelegationTokenManager(config, tokenCache, time, zkClient)
+    tokenManagers += tokenManager
+    tokenManager
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 034557e..8d34c48 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -32,6 +32,7 @@ import scala.collection.JavaConverters._
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
+import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 
@@ -91,6 +92,7 @@ object ZooKeeperTestHarness {
   // which reset static JAAS configuration.
   val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName,
                                   KafkaProducer.NETWORK_THREAD_PREFIX,
+                                  AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
                                   AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
                                   ZkClientEventThreadPrefix)