You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/09 13:16:52 UTC

[kafka] branch trunk updated: KAFKA-8748: Fix flaky testDescribeLogDirsRequest (#7182)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 600cc48  KAFKA-8748: Fix flaky testDescribeLogDirsRequest (#7182)
600cc48 is described below

commit 600cc48fa56ca675421eaf383929e89a04b529f1
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Fri Aug 9 06:16:32 2019 -0700

    KAFKA-8748: Fix flaky testDescribeLogDirsRequest (#7182)
    
    The introduction of KIP-480: Sticky Producer Partitioner had the
    side effect that generateAndProduceMessages can often write
    messages to a lower number of partitions to improve batching.
    
    testDescribeLogDirsRequest (and potentially other tests) relies
    on the messages being written somewhat uniformly to the topic
    partitions. We fix the issue by including a monotonically
    increasing key in the produced messages.
    
    I also included a couple of minor clean-ups I noticed while
    debugging the issue.
    
    The test failed very frequently when executed locally before the
    change and it passed 100 times consecutively after the change.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala  | 3 ++-
 core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala           | 5 +----
 core/src/test/scala/unit/kafka/utils/TestUtils.scala               | 7 +++++--
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index a1e4c73..5e2707a 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -57,7 +57,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
     val log1 = servers.head.logManager.getLog(tp1).get
     assertEquals(log0.size, replicaInfo0.size)
     assertEquals(log1.size, replicaInfo1.size)
-    assertTrue(servers.head.logManager.getLog(tp0).get.logEndOffset > 0)
+    val logEndOffset = servers.head.logManager.getLog(tp0).get.logEndOffset
+    assertTrue(s"LogEndOffset '$logEndOffset' should be > 0", logEndOffset > 0)
     assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp0, log0.logEndOffset, false), replicaInfo0.offsetLag)
     assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp1, log1.logEndOffset, false), replicaInfo1.offsetLag)
   }
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index fb7b07e..65ff4ba 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -155,10 +155,7 @@ object JaasTestUtils {
   val serviceName = "kafka"
 
   def saslConfigs(saslProperties: Option[Properties]): Properties = {
-    val result = saslProperties match {
-      case Some(properties) => properties
-      case None => new Properties
-    }
+    val result = saslProperties.getOrElse(new Properties)
     // IBM Kerberos module doesn't support the serviceName JAAS property, hence it needs to be
     // passed as a Kafka property
     if (Java.isIbmJdk && !result.contains(KafkaConfig.SaslKerberosServiceNameProp))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ffaf47b..960ab4c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
@@ -1065,7 +1065,10 @@ object TestUtils extends Logging {
                                  numMessages: Int,
                                  acks: Int = -1): Seq[String] = {
     val values = (0 until numMessages).map(x =>  s"test-$x")
-    val records = values.map(v => new ProducerRecord[Array[Byte], Array[Byte]](topic, v.getBytes))
+    val intSerializer = new IntegerSerializer()
+    val records = values.zipWithIndex.map { case (v, i) =>
+      new ProducerRecord(topic, intSerializer.serialize(topic, i), v.getBytes)
+    }
     produceMessages(servers, records, acks)
     values
   }