You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/06 09:51:03 UTC
[kafka] branch trunk updated: KAFKA-6528: Fix transient test
failure in testThreadPoolResize (#4526)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65d262c KAFKA-6528: Fix transient test failure in testThreadPoolResize (#4526)
65d262c is described below
commit 65d262cc30315ed47bd17e94c0db672de93d908a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Feb 6 01:50:51 2018 -0800
KAFKA-6528: Fix transient test failure in testThreadPoolResize (#4526)
Add locking to access AbstractFetcherThread#partitionStates during dynamic thread update. Also make testing of thread updates that trigger retries more resilient.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/server/AbstractFetcherManager.scala | 4 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 6 ++
.../server/DynamicBrokerReconfigurationTest.scala | 92 ++++++++++++++--------
3 files changed, 66 insertions(+), 36 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 6d88d8d..312123c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -70,9 +70,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
def resizeThreadPool(newSize: Int): Unit = {
def migratePartitions(newSize: Int): Unit = {
fetcherThreadMap.foreach { case (id, thread) =>
- val removedPartitions = thread.partitionStates.partitionStates.asScala.map { case state =>
- state.topicPartition -> new BrokerAndInitialOffset(thread.sourceBroker, state.value.fetchOffset)
- }.toMap
+ val removedPartitions = thread.partitionsAndOffsets
removeFetcherForPartitions(removedPartitions.keySet)
if (id.fetcherId >= newSize)
thread.shutdown()
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 925c330..39a7032 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -312,6 +312,12 @@ abstract class AbstractFetcherThread(name: String,
finally partitionMapLock.unlock()
}
+ private[server] def partitionsAndOffsets: Map[TopicPartition, BrokerAndInitialOffset] = inLock(partitionMapLock) {
+ partitionStates.partitionStates.asScala.map { case state =>
+ state.topicPartition -> new BrokerAndInitialOffset(sourceBroker, state.value.fetchOffset)
+ }.toMap
+ }
+
}
object AbstractFetcherThread {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b7f0ae8..cb2ac52 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -69,6 +69,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private val servers = new ArrayBuffer[KafkaServer]
private val numServers = 3
+ private val numPartitions = 10
private val producers = new ArrayBuffer[KafkaProducer[String, String]]
private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
private val adminClients = new ArrayBuffer[AdminClient]()
@@ -122,7 +123,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
- TestUtils.createTopic(zkClient, topic, numPartitions = 10, replicationFactor = numServers, servers)
+ TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
createAdminClient(SecurityProtocol.SSL, SecureInternal)
TestMetricsReporter.testReporters.clear()
@@ -203,7 +204,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
@Test
def testKeyStoreAlter(): Unit = {
val topic2 = "testtopic2"
- TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers)
+ TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers)
// Start a producer and consumer that work with the current truststore.
// This should continue working while changes are made
@@ -241,7 +242,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
verifyProduceConsume(producer, consumer, 10, topic2)
// Verify that all messages sent with retries=0 while keystores were being altered were consumed
- stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+ stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
@@ -282,7 +283,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
// Verify that produce/consume worked throughout this test without any retries in producer
- stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+ stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
@@ -370,7 +371,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
servers.tail.foreach { server => assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
// Verify that produce/consume worked throughout this test without any retries in producer
- stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+ stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
@@ -418,9 +419,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString))
maybeVerifyThreadPoolSize(propName, newSize, threadPrefix)
}
- def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String, mayFailRequests: Boolean): Unit = {
+ def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = {
maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
- val numRetries = if (mayFailRequests) 100 else 0
+ val numRetries = if (mayReceiveDuplicates) 100 else 0
val (producerThread, consumerThread) = startProduceConsume(numRetries)
var threadPoolSize = currentSize
(1 to 2).foreach { _ =>
@@ -429,20 +430,20 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
threadPoolSize = increasePoolSize(propName, threadPoolSize, threadPrefix)
Thread.sleep(100)
}
- stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests)
+ stopAndVerifyProduceConsume(producerThread, consumerThread, mayReceiveDuplicates)
}
val config = servers.head.config
verifyThreadPoolResize(KafkaConfig.NumIoThreadsProp, config.numIoThreads,
- requestHandlerPrefix, mayFailRequests = false)
- verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
- networkThreadPrefix, mayFailRequests = true)
+ requestHandlerPrefix, mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.NumReplicaFetchersProp, config.numReplicaFetchers,
- fetcherThreadPrefix, mayFailRequests = false)
+ fetcherThreadPrefix, mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.BackgroundThreadsProp, config.backgroundThreads,
- "kafka-scheduler-", mayFailRequests = false)
+ "kafka-scheduler-", mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.NumRecoveryThreadsPerDataDirProp, config.numRecoveryThreadsPerDataDir,
- "", mayFailRequests = false)
+ "", mayReceiveDuplicates = false)
+ verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
+ networkThreadPrefix, mayReceiveDuplicates = true)
}
@Test
@@ -1055,17 +1056,16 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
private def stopAndVerifyProduceConsume(producerThread: ProducerThread, consumerThread: ConsumerThread,
- mayFailRequests: Boolean = false): Unit = {
+ mayReceiveDuplicates: Boolean = false): Unit = {
TestUtils.waitUntilTrue(() => producerThread.sent >= 10, "Messages not sent")
producerThread.shutdown()
consumerThread.initiateShutdown()
consumerThread.awaitShutdown()
- if (!mayFailRequests)
- assertEquals(producerThread.sent, consumerThread.received)
- else {
- assertTrue(s"Some messages not received, sent=${producerThread.sent} received=${consumerThread.received}",
- consumerThread.received >= producerThread.sent)
- }
+ assertEquals(producerThread.lastSent, consumerThread.lastReceived)
+ assertEquals(0, consumerThread.missingRecords.size)
+ if (!mayReceiveDuplicates)
+ assertFalse("Duplicates not expected", consumerThread.duplicates)
+ assertFalse("Some messages received out of order", consumerThread.outOfOrder)
}
private def verifyConnectionFailure(producer: KafkaProducer[String, String]): Future[_] = {
@@ -1128,32 +1128,58 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private class ProducerThread(clientId: String, retries: Int) extends ShutdownableThread(clientId, isInterruptible = false) {
private val producer = createProducer(trustStoreFile1, retries, clientId)
+ val lastSent = new ConcurrentHashMap[Int, Int]()
@volatile var sent = 0
override def doWork(): Unit = {
- try {
- while (isRunning) {
- sent += 1
- val record = new ProducerRecord(topic, s"key$sent", s"value$sent")
- producer.send(record).get(10, TimeUnit.SECONDS)
- }
- } finally {
- producer.close()
- }
+ try {
+ while (isRunning) {
+ val key = sent.toString
+ val partition = sent % numPartitions
+ val record = new ProducerRecord(topic, partition, key, s"value$sent")
+ producer.send(record).get(10, TimeUnit.SECONDS)
+ lastSent.put(partition, sent)
+ sent += 1
+ }
+ } finally {
+ producer.close()
}
+ }
}
private class ConsumerThread(producerThread: ProducerThread) extends ShutdownableThread("test-consumer", isInterruptible = false) {
private val consumer = createConsumer("group1", trustStoreFile1)
+ val lastReceived = new ConcurrentHashMap[Int, Int]()
+ val missingRecords = new ConcurrentLinkedQueue[Int]()
+ @volatile var outOfOrder = false
+ @volatile var duplicates = false
@volatile var lastBatch: ConsumerRecords[String, String] = _
@volatile private var endTimeMs = Long.MaxValue
- var received = 0
+ @volatile var received = 0
override def doWork(): Unit = {
try {
- while (isRunning || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
+ while (isRunning || (lastReceived != producerThread.lastSent && System.currentTimeMillis < endTimeMs)) {
val records = consumer.poll(50)
received += records.count
- if (!records.isEmpty)
+ if (!records.isEmpty) {
lastBatch = records
+ records.partitions.asScala.foreach { tp =>
+ val partition = tp.partition
+ records.records(tp).asScala.map(_.key.toInt).foreach { key =>
+ val prevKey = lastReceived.asScala.get(partition).getOrElse(partition - numPartitions)
+ val expectedKey = prevKey + numPartitions
+ if (key < prevKey)
+ outOfOrder = true
+ else if (key == prevKey)
+ duplicates = true
+ else {
+ for (i <- expectedKey until key by numPartitions)
+ missingRecords.add(expectedKey)
+ }
+ lastReceived.put(partition, key)
+ missingRecords.remove(key)
+ }
+ }
+ }
}
} finally {
consumer.close()
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.