You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/05/03 02:53:24 UTC

[kafka] branch trunk updated: KAFKA-9589: Enable testLogAppendTimeNonCompressedV2 and fix bug in helper method (#8533)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2aecb08  KAFKA-9589: Enable testLogAppendTimeNonCompressedV2 and fix bug in helper method (#8533)
2aecb08 is described below

commit 2aecb089afe2f5cdc5d36bc91a3f0e39eb7e9a91
Author: Leonard Ge <62...@users.noreply.github.com>
AuthorDate: Sun May 3 03:52:37 2020 +0100

    KAFKA-9589: Enable testLogAppendTimeNonCompressedV2 and fix bug in helper method (#8533)
    
    Adjust `checkLogAppendTimeNonCompressed` to assert
    `shallowOffsetOfMaxTimestamp` correctly for message format 2.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/test/scala/unit/kafka/log/LogValidatorTest.scala | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 7d2738b..41babb7 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -112,6 +112,11 @@ class LogValidatorTest {
     checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
   }
 
+  @Test
+  def testLogAppendTimeNonCompressedV2(): Unit = {
+    checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2)
+  }
+
   private def checkLogAppendTimeNonCompressed(magic: Byte): Unit = {
     val now = System.currentTimeMillis()
     // The timestamps should be overwritten
@@ -135,17 +140,16 @@ class LogValidatorTest {
     assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
     validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch))
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
 
+    // we index from last offset in version 2 instead of base offset
+    val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0
+    assertEquals(s"The offset of max timestamp should be $expectedMaxTimestampOffset",
+      expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp)
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
       compressed = false)
   }
 
-  def testLogAppendTimeNonCompressedV2(): Unit = {
-    checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2)
-  }
-
   @Test
   def testLogAppendTimeWithRecompressionV1(): Unit = {
     checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1)