You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/10/27 14:49:55 UTC

kafka git commit: MINOR: Ensure that the producer in testAlterReplicaLogDirs is always closed

Repository: kafka
Updated Branches:
  refs/heads/trunk 603d4e5d9 -> 4e8ad90b9


MINOR: Ensure that the producer in testAlterReplicaLogDirs is always closed

Failure to close the producer could cause a transient failure, more details
below.

The request timeout was only 2 seconds, exceptions thrown were not
propagated and the producer would not be closed. If the exception
was thrown during `send`, we did not increment `numMessages`
allowing the test to pass.

I have increased the timeout to 10 seconds and made sure that
exceptions are propagated.

Example of the error:

```text
kafka.api.SaslSslAdminClientIntegrationTest > classMethod STARTED

kafka.api.SaslSslAdminClientIntegrationTest > classMethod FAILED
    java.lang.AssertionError: Found unexpected threads, allThreads=Set(metrics-meter-tick-thread-2, Signal Dispatcher, main, Reference Handler, scala-execution-context-global-164, kafka-producer-network-thread | producer-1, scala-execution-context-global-166, Test worker, scala-execution-context-global-1249, /0:0:0:0:0:0:0:1:58910 to /0:0:0:0:0:0:0:1:43025 workers Thread 2, Finalizer, /0:0:0:0:0:0:0:1:58910 to /0:0:0:0:0:0:0:1:43025 workers Thread 3, scala-execution-context-global-163, metrics-meter-tick-thread-1)
```

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4144 from ijuma/ensure-producer-is-closed-test-alter-replica-log-dirs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e8ad90b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e8ad90b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e8ad90b

Branch: refs/heads/trunk
Commit: 4e8ad90b94f28affd8e29db136df5e6e7c6c6c15
Parents: 603d4e5
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Oct 27 15:49:42 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Oct 27 15:49:42 2017 +0100

----------------------------------------------------------------------
 .../kafka/api/AdminClientIntegrationTest.scala  | 75 +++++++++-----------
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  8 +--
 2 files changed, 39 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8ad90b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 583ba7a..d92ca30 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -21,7 +21,7 @@ import java.util.{Collections, Properties}
 import java.util.Arrays.asList
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import java.io.File
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import org.apache.kafka.clients.admin.KafkaAdminClientTest
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -45,6 +45,8 @@ import org.junit.Assert._
 
 import scala.util.Random
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
 /**
  * An integration test of the KafkaAdminClient.
@@ -278,13 +280,11 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
       val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition())
       assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir)
     }
-
-    client.close()
   }
 
   @Test
   def testAlterReplicaLogDirs(): Unit = {
-    val adminClient = AdminClient.create(createConfig())
+    client = AdminClient.create(createConfig())
     val topic = "topic"
     val tp = new TopicPartition(topic, 0)
     val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap
@@ -300,23 +300,21 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     }.toMap
 
     // Verify that replica can be created in the specified log directory
-    adminClient.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).values().asScala.values.foreach { future =>
-      try {
-        future.get()
-        fail("Future should fail with ReplicaNotAvailableException")
-      } catch {
-        case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[ReplicaNotAvailableException])
-      }
+    val futures = client.alterReplicaLogDirs(firstReplicaAssignment.asJava,
+      new AlterReplicaLogDirsOptions).values.asScala.values
+    futures.foreach { future =>
+      val exception = intercept[ExecutionException](future.get)
+      assertTrue(exception.getCause.isInstanceOf[ReplicaNotAvailableException])
     }
 
-    TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties())
+    TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties)
     servers.foreach { server =>
       val logDir = server.logManager.getLog(tp).get.dir.getParent
       assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
     }
 
     // Verify that replica can be moved to the specified log directory after the topic has been created
-    adminClient.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).all().get()
+    client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
     servers.foreach { server =>
       TestUtils.waitUntilTrue(() => {
         val logDir = server.logManager.getLog(tp).get.dir.getParent
@@ -326,48 +324,45 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
 
     // Verify that replica can be moved to the specified log directory while the producer is sending messages
     val running = new AtomicBoolean(true)
-    @volatile var numMessages = 0
-    val thread = new Thread() {
-      override def run(): Unit = {
-        val producer = TestUtils.createNewProducer(
-          TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol),
-          securityProtocol = securityProtocol,
-          trustStoreFile = trustStoreFile,
-          retries = 0, // Producer should not have to retry when broker is moving replica between log directories.
-          requestTimeoutMs = 2000,
-          acks = -1
-        )
-
-        while (running.get()) {
+    val numMessages = new AtomicInteger
+    import scala.concurrent.ExecutionContext.Implicits._
+    val producerFuture = Future {
+      val producer = TestUtils.createNewProducer(
+        TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol),
+        securityProtocol = securityProtocol,
+        trustStoreFile = trustStoreFile,
+        retries = 0, // Producer should not have to retry when broker is moving replica between log directories.
+        requestTimeoutMs = 10000,
+        acks = -1
+      )
+      try {
+        while (running.get) {
           val future = producer.send(new ProducerRecord(topic, s"xxxxxxxxxxxxxxxxxxxx-$numMessages".getBytes))
-          numMessages += 1
-          future.get()
+          numMessages.incrementAndGet()
+          future.get(10, TimeUnit.SECONDS)
         }
-        producer.close()
-      }
+        numMessages.get
+      } finally producer.close()
     }
 
     try {
-      thread.start()
-      TestUtils.waitUntilTrue(() => numMessages > 100, "timed out waiting for message produce", 6000L)
-      adminClient.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).all().get()
+      TestUtils.waitUntilTrue(() => numMessages.get > 100, "timed out waiting for message produce", 6000L)
+      client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
       servers.foreach { server =>
         TestUtils.waitUntilTrue(() => {
           val logDir = server.logManager.getLog(tp).get.dir.getParent
           firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
         }, "timed out waiting for replica movement", 6000L)
       }
-    } finally {
-      running.set(false)
-      thread.join()
-    }
+    } finally running.set(false)
+
+    val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS))
 
     // Verify that all messages that are produced can be consumed
-    val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, numMessages, securityProtocol, trustStoreFile)
+    val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages, securityProtocol, trustStoreFile)
     consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
-      assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value()))
+      assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value))
     }
-    adminClient.close()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8ad90b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 0a7e631..85a5596 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -81,7 +81,7 @@ object ZooKeeperTestHarness {
    */
   @BeforeClass
   def setUpClass() {
-    verifyNoUnexpectedThreads()
+    verifyNoUnexpectedThreads("@BeforeClass")
   }
 
   /**
@@ -89,18 +89,18 @@ object ZooKeeperTestHarness {
    */
   @AfterClass
   def tearDownClass() {
-    verifyNoUnexpectedThreads()
+    verifyNoUnexpectedThreads("@AfterClass")
   }
 
   /**
    * Verifies that threads which are known to cause transient failures in subsequent tests
    * have been shutdown.
    */
-  def verifyNoUnexpectedThreads() {
+  def verifyNoUnexpectedThreads(context: String) {
     def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
     val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads =>
       threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
     }
-    assertTrue(s"Found unexpected threads, allThreads=$threads", noUnexpected)
+    assertTrue(s"Found unexpected threads during $context, allThreads=$threads", noUnexpected)
   }
 }