You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/03/18 08:52:16 UTC

[kafka] branch 2.2 updated: KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (#6456)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new d8527ed  KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (#6456)
d8527ed is described below

commit d8527edc01b4397c128e397fdcc774808e126d0a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon Mar 18 08:47:28 2019 +0000

    KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (#6456)
    
    We verify that ZK clients are closed in tests since these can affect subsequent tests and that makes it hard to debug test failures. But because of changes to ZooKeeper client, we were checking the wrong thread name. The thread name used now is <creatorThreadName>-EventThread where creatorThreadName varies depending on the test. Fixed ZooKeeperTestHarness to check this format and fixed tests which were leaving ZK clients behind. Also added a test to make sure we can detect changes to  [...]
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Manikumar Reddy <ma...@gmail.com>
---
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  4 +--
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 30 ++++++++++++++--------
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index ebb5fb7..5a62464 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -84,7 +84,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 }
 
 object ZooKeeperTestHarness {
-  val ZkClientEventThreadPrefix = "ZkClient-EventThread"
+  val ZkClientEventThreadSuffix = "-EventThread"
 
   // Threads which may cause transient failures in subsequent tests if not shutdown.
   // These include threads which make connections to brokers and may cause issues
@@ -94,7 +94,7 @@ object ZooKeeperTestHarness {
                                   KafkaProducer.NETWORK_THREAD_PREFIX,
                                   AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
                                   AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
-                                  ZkClientEventThreadPrefix)
+                                  ZkClientEventThreadSuffix)
 
   /**
    * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index fd3f59c..e943348 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -82,8 +82,16 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testConnection(): Unit = {
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
-      "testMetricType").close()
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
+      "testMetricType")
+    try {
+      // Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients
+      val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
+      assertTrue(s"ZooKeeperClient event thread not found, threads=$threads",
+        threads.exists(_.contains(ZooKeeperTestHarness.ZkClientEventThreadSuffix)))
+    } finally {
+      client.close()
+    }
   }
 
   @Test
@@ -327,14 +335,15 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       }
     }
 
-    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
+    zooKeeperClient.close()
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
       "testMetricGroup", "testMetricType")
-    client.registerStateChangeHandler(stateChangeHandler)
+    zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
 
     val requestThread = new Thread() {
       override def run(): Unit = {
         try
-          client.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+          zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
             ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
         finally
           latch.countDown()
@@ -343,7 +352,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
     val reinitializeThread = new Thread() {
       override def run(): Unit = {
-        client.forceReinitialize()
+        zooKeeperClient.forceReinitialize()
       }
     }
 
@@ -375,12 +384,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       }
     }
 
-    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
+    zooKeeperClient.close()
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
       "testMetricGroup", "testMetricType")
-    client.registerStateChangeHandler(faultyHandler)
-    client.registerStateChangeHandler(goodHandler)
+    zooKeeperClient.registerStateChangeHandler(faultyHandler)
+    zooKeeperClient.registerStateChangeHandler(goodHandler)
 
-    client.forceReinitialize()
+    zooKeeperClient.forceReinitialize()
 
     assertEquals(1, goodCalls.get)