You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/10/27 14:49:55 UTC
kafka git commit: MINOR: Ensure that the producer in
testAlterReplicaLogDirs is always closed
Repository: kafka
Updated Branches:
refs/heads/trunk 603d4e5d9 -> 4e8ad90b9
MINOR: Ensure that the producer in testAlterReplicaLogDirs is always closed
Failure to close the producer could cause a transient failure, more details
below.
The request timeout was only 2 seconds, exceptions thrown were not
propagated and the producer would not be closed. If the exception
was thrown during `send`, we did not increment `numMessages`
allowing the test to pass.
I have increased the timeout to 10 seconds and made sure that
exceptions are propagated.
Example of the error:
```text
kafka.api.SaslSslAdminClientIntegrationTest > classMethod STARTED
kafka.api.SaslSslAdminClientIntegrationTest > classMethod FAILED
java.lang.AssertionError: Found unexpected threads, allThreads=Set(metrics-meter-tick-thread-2, Signal Dispatcher, main, Reference Handler, scala-execution-context-global-164, kafka-producer-network-thread | producer-1, scala-execution-context-global-166, Test worker, scala-execution-context-global-1249, /0:0:0:0:0:0:0:1:58910 to /0:0:0:0:0:0:0:1:43025 workers Thread 2, Finalizer, /0:0:0:0:0:0:0:1:58910 to /0:0:0:0:0:0:0:1:43025 workers Thread 3, scala-execution-context-global-163, metrics-meter-tick-thread-1)
```
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #4144 from ijuma/ensure-producer-is-closed-test-alter-replica-log-dirs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e8ad90b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e8ad90b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e8ad90b
Branch: refs/heads/trunk
Commit: 4e8ad90b94f28affd8e29db136df5e6e7c6c6c15
Parents: 603d4e5
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Oct 27 15:49:42 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Oct 27 15:49:42 2017 +0100
----------------------------------------------------------------------
.../kafka/api/AdminClientIntegrationTest.scala | 75 +++++++++-----------
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 8 +--
2 files changed, 39 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8ad90b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 583ba7a..d92ca30 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -21,7 +21,7 @@ import java.util.{Collections, Properties}
import java.util.Arrays.asList
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.io.File
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import org.apache.kafka.clients.admin.KafkaAdminClientTest
import org.apache.kafka.common.utils.{Time, Utils}
@@ -45,6 +45,8 @@ import org.junit.Assert._
import scala.util.Random
import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
/**
* An integration test of the KafkaAdminClient.
@@ -278,13 +280,11 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition())
assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir)
}
-
- client.close()
}
@Test
def testAlterReplicaLogDirs(): Unit = {
- val adminClient = AdminClient.create(createConfig())
+ client = AdminClient.create(createConfig())
val topic = "topic"
val tp = new TopicPartition(topic, 0)
val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap
@@ -300,23 +300,21 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
}.toMap
// Verify that replica can be created in the specified log directory
- adminClient.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).values().asScala.values.foreach { future =>
- try {
- future.get()
- fail("Future should fail with ReplicaNotAvailableException")
- } catch {
- case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[ReplicaNotAvailableException])
- }
+ val futures = client.alterReplicaLogDirs(firstReplicaAssignment.asJava,
+ new AlterReplicaLogDirsOptions).values.asScala.values
+ futures.foreach { future =>
+ val exception = intercept[ExecutionException](future.get)
+ assertTrue(exception.getCause.isInstanceOf[ReplicaNotAvailableException])
}
- TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties())
+ TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties)
servers.foreach { server =>
val logDir = server.logManager.getLog(tp).get.dir.getParent
assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
}
// Verify that replica can be moved to the specified log directory after the topic has been created
- adminClient.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).all().get()
+ client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
servers.foreach { server =>
TestUtils.waitUntilTrue(() => {
val logDir = server.logManager.getLog(tp).get.dir.getParent
@@ -326,48 +324,45 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
// Verify that replica can be moved to the specified log directory while the producer is sending messages
val running = new AtomicBoolean(true)
- @volatile var numMessages = 0
- val thread = new Thread() {
- override def run(): Unit = {
- val producer = TestUtils.createNewProducer(
- TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol),
- securityProtocol = securityProtocol,
- trustStoreFile = trustStoreFile,
- retries = 0, // Producer should not have to retry when broker is moving replica between log directories.
- requestTimeoutMs = 2000,
- acks = -1
- )
-
- while (running.get()) {
+ val numMessages = new AtomicInteger
+ import scala.concurrent.ExecutionContext.Implicits._
+ val producerFuture = Future {
+ val producer = TestUtils.createNewProducer(
+ TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol),
+ securityProtocol = securityProtocol,
+ trustStoreFile = trustStoreFile,
+ retries = 0, // Producer should not have to retry when broker is moving replica between log directories.
+ requestTimeoutMs = 10000,
+ acks = -1
+ )
+ try {
+ while (running.get) {
val future = producer.send(new ProducerRecord(topic, s"xxxxxxxxxxxxxxxxxxxx-$numMessages".getBytes))
- numMessages += 1
- future.get()
+ numMessages.incrementAndGet()
+ future.get(10, TimeUnit.SECONDS)
}
- producer.close()
- }
+ numMessages.get
+ } finally producer.close()
}
try {
- thread.start()
- TestUtils.waitUntilTrue(() => numMessages > 100, "timed out waiting for message produce", 6000L)
- adminClient.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).all().get()
+ TestUtils.waitUntilTrue(() => numMessages.get > 100, "timed out waiting for message produce", 6000L)
+ client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get
servers.foreach { server =>
TestUtils.waitUntilTrue(() => {
val logDir = server.logManager.getLog(tp).get.dir.getParent
firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir
}, "timed out waiting for replica movement", 6000L)
}
- } finally {
- running.set(false)
- thread.join()
- }
+ } finally running.set(false)
+
+ val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS))
// Verify that all messages that are produced can be consumed
- val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, numMessages, securityProtocol, trustStoreFile)
+ val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages, securityProtocol, trustStoreFile)
consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) =>
- assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value()))
+ assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value))
}
- adminClient.close()
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8ad90b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 0a7e631..85a5596 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -81,7 +81,7 @@ object ZooKeeperTestHarness {
*/
@BeforeClass
def setUpClass() {
- verifyNoUnexpectedThreads()
+ verifyNoUnexpectedThreads("@BeforeClass")
}
/**
@@ -89,18 +89,18 @@ object ZooKeeperTestHarness {
*/
@AfterClass
def tearDownClass() {
- verifyNoUnexpectedThreads()
+ verifyNoUnexpectedThreads("@AfterClass")
}
/**
* Verifies that threads which are known to cause transient failures in subsequent tests
* have been shutdown.
*/
- def verifyNoUnexpectedThreads() {
+ def verifyNoUnexpectedThreads(context: String) {
def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads =>
threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
}
- assertTrue(s"Found unexpected threads, allThreads=$threads", noUnexpected)
+ assertTrue(s"Found unexpected threads during $context, allThreads=$threads", noUnexpected)
}
}