You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/06 09:51:03 UTC

[kafka] branch trunk updated: KAFKA-6528: Fix transient test failure in testThreadPoolResize (#4526)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65d262c  KAFKA-6528: Fix transient test failure in testThreadPoolResize (#4526)
65d262c is described below

commit 65d262cc30315ed47bd17e94c0db672de93d908a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Feb 6 01:50:51 2018 -0800

    KAFKA-6528: Fix transient test failure in testThreadPoolResize (#4526)
    
    Add locking to access AbstractFetcherThread#partitionStates during dynamic thread update. Also make testing of thread updates that trigger retries more resilient.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/server/AbstractFetcherManager.scala      |  4 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |  6 ++
 .../server/DynamicBrokerReconfigurationTest.scala  | 92 ++++++++++++++--------
 3 files changed, 66 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 6d88d8d..312123c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -70,9 +70,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
       fetcherThreadMap.foreach { case (id, thread) =>
-        val removedPartitions = thread.partitionStates.partitionStates.asScala.map { case state =>
-          state.topicPartition -> new BrokerAndInitialOffset(thread.sourceBroker, state.value.fetchOffset)
-        }.toMap
+        val removedPartitions = thread.partitionsAndOffsets
         removeFetcherForPartitions(removedPartitions.keySet)
         if (id.fetcherId >= newSize)
           thread.shutdown()
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 925c330..39a7032 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -312,6 +312,12 @@ abstract class AbstractFetcherThread(name: String,
     finally partitionMapLock.unlock()
   }
 
+  private[server] def partitionsAndOffsets: Map[TopicPartition, BrokerAndInitialOffset] = inLock(partitionMapLock) {
+    partitionStates.partitionStates.asScala.map { case state =>
+      state.topicPartition -> new BrokerAndInitialOffset(sourceBroker, state.value.fetchOffset)
+    }.toMap
+  }
+
 }
 
 object AbstractFetcherThread {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b7f0ae8..cb2ac52 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -69,6 +69,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   private val servers = new ArrayBuffer[KafkaServer]
   private val numServers = 3
+  private val numPartitions = 10
   private val producers = new ArrayBuffer[KafkaProducer[String, String]]
   private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
   private val adminClients = new ArrayBuffer[AdminClient]()
@@ -122,7 +123,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
       replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
 
-    TestUtils.createTopic(zkClient, topic, numPartitions = 10, replicationFactor = numServers, servers)
+    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
 
     TestMetricsReporter.testReporters.clear()
@@ -203,7 +204,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   @Test
   def testKeyStoreAlter(): Unit = {
     val topic2 = "testtopic2"
-    TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers)
+    TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers)
 
     // Start a producer and consumer that work with the current truststore.
     // This should continue working while changes are made
@@ -241,7 +242,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     verifyProduceConsume(producer, consumer, 10, topic2)
 
     // Verify that all messages sent with retries=0 while keystores were being altered were consumed
-    stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+    stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
   @Test
@@ -282,7 +283,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
 
     // Verify that produce/consume worked throughout this test without any retries in producer
-    stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+    stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
   @Test
@@ -370,7 +371,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     servers.tail.foreach { server => assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
 
     // Verify that produce/consume worked throughout this test without any retries in producer
-    stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+    stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
   @Test
@@ -418,9 +419,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString))
       maybeVerifyThreadPoolSize(propName, newSize, threadPrefix)
     }
-    def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String, mayFailRequests: Boolean): Unit = {
+    def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = {
       maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
-      val numRetries = if (mayFailRequests) 100 else 0
+      val numRetries = if (mayReceiveDuplicates) 100 else 0
       val (producerThread, consumerThread) = startProduceConsume(numRetries)
       var threadPoolSize = currentSize
       (1 to 2).foreach { _ =>
@@ -429,20 +430,20 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
         threadPoolSize = increasePoolSize(propName, threadPoolSize, threadPrefix)
         Thread.sleep(100)
       }
-      stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests)
+      stopAndVerifyProduceConsume(producerThread, consumerThread, mayReceiveDuplicates)
     }
 
     val config = servers.head.config
     verifyThreadPoolResize(KafkaConfig.NumIoThreadsProp, config.numIoThreads,
-      requestHandlerPrefix, mayFailRequests = false)
-    verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
-      networkThreadPrefix, mayFailRequests = true)
+      requestHandlerPrefix, mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumReplicaFetchersProp, config.numReplicaFetchers,
-      fetcherThreadPrefix, mayFailRequests = false)
+      fetcherThreadPrefix, mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.BackgroundThreadsProp, config.backgroundThreads,
-      "kafka-scheduler-", mayFailRequests = false)
+      "kafka-scheduler-", mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumRecoveryThreadsPerDataDirProp, config.numRecoveryThreadsPerDataDir,
-      "", mayFailRequests = false)
+      "", mayReceiveDuplicates = false)
+    verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
+      networkThreadPrefix, mayReceiveDuplicates = true)
   }
 
   @Test
@@ -1055,17 +1056,16 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   private def stopAndVerifyProduceConsume(producerThread: ProducerThread, consumerThread: ConsumerThread,
-                                          mayFailRequests: Boolean = false): Unit = {
+                                          mayReceiveDuplicates: Boolean = false): Unit = {
     TestUtils.waitUntilTrue(() => producerThread.sent >= 10, "Messages not sent")
     producerThread.shutdown()
     consumerThread.initiateShutdown()
     consumerThread.awaitShutdown()
-    if (!mayFailRequests)
-      assertEquals(producerThread.sent, consumerThread.received)
-    else {
-      assertTrue(s"Some messages not received, sent=${producerThread.sent} received=${consumerThread.received}",
-        consumerThread.received >= producerThread.sent)
-    }
+    assertEquals(producerThread.lastSent, consumerThread.lastReceived)
+    assertEquals(0, consumerThread.missingRecords.size)
+    if (!mayReceiveDuplicates)
+      assertFalse("Duplicates not expected", consumerThread.duplicates)
+    assertFalse("Some messages received out of order", consumerThread.outOfOrder)
   }
 
   private def verifyConnectionFailure(producer: KafkaProducer[String, String]): Future[_] = {
@@ -1128,32 +1128,58 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   private class ProducerThread(clientId: String, retries: Int) extends ShutdownableThread(clientId, isInterruptible = false) {
     private val producer = createProducer(trustStoreFile1, retries, clientId)
+    val lastSent = new ConcurrentHashMap[Int, Int]()
     @volatile var sent = 0
     override def doWork(): Unit = {
-        try {
-            while (isRunning) {
-                sent += 1
-                val record = new ProducerRecord(topic, s"key$sent", s"value$sent")
-                producer.send(record).get(10, TimeUnit.SECONDS)
-              }
-          } finally {
-            producer.close()
-          }
+      try {
+        while (isRunning) {
+          val key = sent.toString
+          val partition = sent % numPartitions
+          val record = new ProducerRecord(topic, partition, key, s"value$sent")
+          producer.send(record).get(10, TimeUnit.SECONDS)
+          lastSent.put(partition, sent)
+          sent += 1
+        }
+      } finally {
+        producer.close()
       }
+    }
   }
 
   private class ConsumerThread(producerThread: ProducerThread) extends ShutdownableThread("test-consumer", isInterruptible = false) {
     private val consumer = createConsumer("group1", trustStoreFile1)
+    val lastReceived = new ConcurrentHashMap[Int, Int]()
+    val missingRecords = new ConcurrentLinkedQueue[Int]()
+    @volatile var outOfOrder = false
+    @volatile var duplicates = false
     @volatile var lastBatch: ConsumerRecords[String, String] = _
     @volatile private var endTimeMs = Long.MaxValue
-    var received = 0
+    @volatile var received = 0
     override def doWork(): Unit = {
       try {
-        while (isRunning || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
+        while (isRunning || (lastReceived != producerThread.lastSent && System.currentTimeMillis < endTimeMs)) {
           val records = consumer.poll(50)
           received += records.count
-          if (!records.isEmpty)
+          if (!records.isEmpty) {
             lastBatch = records
+            records.partitions.asScala.foreach { tp =>
+              val partition = tp.partition
+              records.records(tp).asScala.map(_.key.toInt).foreach { key =>
+                val prevKey = lastReceived.asScala.get(partition).getOrElse(partition - numPartitions)
+                val expectedKey = prevKey + numPartitions
+                if (key < prevKey)
+                  outOfOrder = true
+                else if (key == prevKey)
+                  duplicates = true
+                else {
+                  for (i <- expectedKey until key by numPartitions)
+                    missingRecords.add(expectedKey)
+                }
+                lastReceived.put(partition, key)
+                missingRecords.remove(key)
+              }
+            }
+          }
         }
       } finally {
         consumer.close()

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.