You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/09 13:16:52 UTC
[kafka] branch trunk updated: KAFKA-8748: Fix flaky
testDescribeLogDirsRequest (#7182)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 600cc48 KAFKA-8748: Fix flaky testDescribeLogDirsRequest (#7182)
600cc48 is described below
commit 600cc48fa56ca675421eaf383929e89a04b529f1
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Fri Aug 9 06:16:32 2019 -0700
KAFKA-8748: Fix flaky testDescribeLogDirsRequest (#7182)
The introduction of KIP-480: Sticky Producer Partitioner had the
side effect that generateAndProduceMessages can often write
messages to a lower number of partitions to improve batching.
testDescribeLogDirsRequest (and potentially other tests) relies
on the messages being written somewhat uniformly to the topic
partitions. We fix the issue by including a monotonically
increasing key in the produced messages.
I also included a couple of minor clean-ups I noticed while
debugging the issue.
The test failed very frequently when executed locally before the
change and it passed 100 times consecutively after the change.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala | 3 ++-
core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala | 5 +----
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 7 +++++--
3 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index a1e4c73..5e2707a 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -57,7 +57,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
val log1 = servers.head.logManager.getLog(tp1).get
assertEquals(log0.size, replicaInfo0.size)
assertEquals(log1.size, replicaInfo1.size)
- assertTrue(servers.head.logManager.getLog(tp0).get.logEndOffset > 0)
+ val logEndOffset = servers.head.logManager.getLog(tp0).get.logEndOffset
+ assertTrue(s"LogEndOffset '$logEndOffset' should be > 0", logEndOffset > 0)
assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp0, log0.logEndOffset, false), replicaInfo0.offsetLag)
assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp1, log1.logEndOffset, false), replicaInfo1.offsetLag)
}
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index fb7b07e..65ff4ba 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -155,10 +155,7 @@ object JaasTestUtils {
val serviceName = "kafka"
def saslConfigs(saslProperties: Option[Properties]): Properties = {
- val result = saslProperties match {
- case Some(properties) => properties
- case None => new Properties
- }
+ val result = saslProperties.getOrElse(new Properties)
// IBM Kerberos module doesn't support the serviceName JAAS property, hence it needs to be
// passed as a Kafka property
if (Java.isIbmJdk && !result.contains(KafkaConfig.SaslKerberosServiceNameProp))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ffaf47b..960ab4c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.record._
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
@@ -1065,7 +1065,10 @@ object TestUtils extends Logging {
numMessages: Int,
acks: Int = -1): Seq[String] = {
val values = (0 until numMessages).map(x => s"test-$x")
- val records = values.map(v => new ProducerRecord[Array[Byte], Array[Byte]](topic, v.getBytes))
+ val intSerializer = new IntegerSerializer()
+ val records = values.zipWithIndex.map { case (v, i) =>
+ new ProducerRecord(topic, intSerializer.serialize(topic, i), v.getBytes)
+ }
produceMessages(servers, records, acks)
values
}