You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2024/03/28 05:40:42 UTC

(kafka) 02/02: Revert "KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records (#15542)"

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 64f7a0a300c70b00c0cc357d0a936ea8b42b69fb
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Thu Mar 28 11:08:50 2024 +0530

    Revert "KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records (#15542)"
    
    This reverts commit 8aa39869aae388abf9e3d73e364cb17e2dd6a8f8.
---
 .../kafka/common/record/MemoryRecordsBuilder.java  |  29 ++-
 .../common/record/MemoryRecordsBuilderTest.java    |  22 +--
 .../kafka/common/record/MemoryRecordsTest.java     |   5 +-
 .../kafka/admin/ListOffsetsIntegrationTest.scala   | 218 +++------------------
 .../kafka/server/QuorumTestHarness.scala           |   2 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  27 ++-
 .../kafka/storage/internals/log/LogValidator.java  |  13 +-
 7 files changed, 81 insertions(+), 235 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index de03030d82e..3e9360f04ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -242,23 +242,34 @@ public class MemoryRecordsBuilder implements AutoCloseable {
 
     /**
      * Get the max timestamp and its offset. The details of the offset returned are a bit subtle.
-     * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp.
      *
-     * If the log append time is used, the offset will be the first offset of the record.
+     * If the log append time is used, the offset will be the last offset unless no compression is used and
+     * the message format version is 0 or 1, in which case, it will be the first offset.
      *
-     * If create time is used, the offset will always be the offset of the record with the max timestamp.
-     *
-     * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records.
+     * If create time is used, the offset will be the last offset unless no compression is used and the message
+     * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp.
      *
      * @return The max timestamp and its offset
      */
     public RecordsInfo info() {
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
-            return new RecordsInfo(logAppendTime, baseOffset);
+            long shallowOffsetOfMaxTimestamp;
+            // Use the last offset when dealing with record batches
+            if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
+                shallowOffsetOfMaxTimestamp = lastOffset;
+            else
+                shallowOffsetOfMaxTimestamp = baseOffset;
+            return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
+        } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
+            return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
         } else {
-            // For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping
-            // If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1]
-            return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);
+            long shallowOffsetOfMaxTimestamp;
+            // Use the last offset when dealing with record batches
+            if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
+                shallowOffsetOfMaxTimestamp = lastOffset;
+            else
+                shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
+            return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 0923baea01e..4f3f03c3f2d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -378,8 +378,10 @@ public class MemoryRecordsBuilderTest {
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
         assertEquals(logAppendTime, info.maxTimestamp);
 
-        // When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp
-        assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
+        if (args.compressionType == CompressionType.NONE && magic <= MAGIC_VALUE_V1)
+            assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
+        else
+            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
         for (RecordBatch batch : records.batches()) {
             if (magic == MAGIC_VALUE_V0) {
@@ -413,11 +415,10 @@ public class MemoryRecordsBuilderTest {
             assertEquals(2L, info.maxTimestamp);
         }
 
-        if (magic == MAGIC_VALUE_V0)
-            // in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1.
-            assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
-        else
+        if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1)
             assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
+        else
+            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
         int i = 0;
         long[] expectedTimestamps = new long[] {0L, 2L, 1L};
@@ -494,13 +495,12 @@ public class MemoryRecordsBuilderTest {
         MemoryRecords records = builder.build();
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
-        if (magic == MAGIC_VALUE_V0) {
+        if (magic == MAGIC_VALUE_V0)
             assertEquals(-1, info.maxTimestamp);
-            assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
-        } else {
+        else
             assertEquals(2L, info.maxTimestamp);
-            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
-        }
+
+        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
         long i = 0L;
         for (RecordBatch batch : records.batches()) {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 50821af841c..3f0195bf5d1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -893,7 +893,10 @@ public class MemoryRecordsTest {
         assertEquals(filtered.limit(), result.bytesRetained());
         if (magic > RecordBatch.MAGIC_VALUE_V0) {
             assertEquals(20L, result.maxTimestamp());
-            assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
+            if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2)
+                assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
+            else
+                assertEquals(5L, result.shallowOffsetOfMaxTimestamp());
         }
 
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index ad1718c6a43..bcb9641e9e8 100644
--- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -19,235 +19,77 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
-import kafka.utils.{TestInfoUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.utils.{MockTime, Time, Utils}
+import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
-import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
-  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
-  var setOldMessageFormat: Boolean = false
-  val mockTime: Time = new MockTime(1)
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    createTopicWithConfig(topicName, new Properties())
+    createTopic(topicName, 1, 1.toShort)
+    produceMessages()
     adminClient = Admin.create(Map[String, Object](
       AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
     ).asJava)
   }
 
-  override def brokerTime(brokerId: Int): Time = mockTime
-
   @AfterEach
   override def tearDown(): Unit = {
-    setOldMessageFormat = false
     Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
     super.tearDown()
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
-    produceMessagesInOneBatch("gzip")
-    verifyListOffsets()
-
-    // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
-    produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
-    // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
-    // So in this one batch test, it'll be the first offset 0
-    verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
-    produceMessagesInSeparateBatch()
-    verifyListOffsets()
-  }
-
-  // The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
-  def testThreeRecordsInOneBatchWithMessageConversion(quorum: String): Unit = {
-    createOldMessageFormatBrokers()
-    produceMessagesInOneBatch()
-    verifyListOffsets()
-
-    // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
-    produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
-    // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
-    // So in this one batch test, it'll be the first offset 0
-    verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
-  }
-
-  // The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
-  def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = {
-    createOldMessageFormatBrokers()
-    produceMessagesInSeparateBatch()
-    verifyListOffsets()
-
-    // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
-    produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
-    // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
-    // So in this separate batch test, it'll be the last offset 2
-    verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = {
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
-    produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
-    verifyListOffsets(topic = topicNameWithCustomConfigs)
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = {
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
-    produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
-    verifyListOffsets(topic = topicNameWithCustomConfigs)
-  }
-
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeCompressedRecordsInSeparateBatch(quorum: String): Unit = {
-    produceMessagesInSeparateBatch("gzip")
-    verifyListOffsets()
-
-    // test LogAppendTime case
-    val props: Properties = new Properties()
-    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
-    createTopicWithConfig(topicNameWithCustomConfigs, props)
-    produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
-    // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
-    // So in this separate batch test, it'll be the last offset 2
-    verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
-  }
-
-  private def createOldMessageFormatBrokers(): Unit = {
-    setOldMessageFormat = true
-    recreateBrokers(reconfigure = true, startup = true)
-    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
-    adminClient = Admin.create(Map[String, Object](
-      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
-    ).asJava)
-  }
-
-  private def createTopicWithConfig(topic: String, props: Properties): Unit = {
-    createTopic(topic, 1, 1.toShort, topicConfig = props)
-  }
-
-  private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = {
-    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic)
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
     assertEquals(0, earliestOffset.offset())
+  }
 
-    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
     assertEquals(3, latestOffset.offset())
+  }
 
-    val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic)
-    assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
+    assertEquals(1, maxTimestampOffset.offset())
   }
 
   private def runFetchOffsets(adminClient: Admin,
-                              offsetSpec: OffsetSpec,
-                              topic: String): ListOffsetsResult.ListOffsetsResultInfo = {
-    val tp = new TopicPartition(topic, 0)
+                              offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
+    val tp = new TopicPartition(topicName, 0)
     adminClient.listOffsets(Map(
       tp -> offsetSpec
     ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  private def produceMessagesInOneBatch(compressionType: String = "none", topic: String = topicName): Unit = {
+  def produceMessages(): Unit = {
     val records = Seq(
-      new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 100L,
-        null, new Array[Byte](10)),
-      new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 999L,
-        null, new Array[Byte](10)),
-      new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 200L,
-        null, new Array[Byte](10)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
+        null, new Array[Byte](10000)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
+        null, new Array[Byte](10000)),
+      new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
+        null, new Array[Byte](10000)),
     )
-    // create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records),
-    // so that we can confirm all records will be accumulated in producer until we flush them into one batch.
-    val producer = createProducer(
-      plaintextBootstrapServers(brokers),
-      deliveryTimeoutMs = Int.MaxValue,
-      lingerMs = Int.MaxValue,
-      compressionType = compressionType)
-
-    try {
-      val futures = records.map(producer.send)
-      producer.flush()
-      futures.foreach(_.get)
-    } finally {
-      producer.close()
-    }
+    TestUtils.produceMessages(servers, records, -1)
   }
 
-  private def produceMessagesInSeparateBatch(compressionType: String = "none", topic: String = topicName): Unit = {
-    val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 100L,
-      null, new Array[Byte](10)))
-    val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 999L,
-      null, new Array[Byte](10)))
-    val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 200L,
-      null, new Array[Byte](10)))
-
-    val producer = createProducer(
-      plaintextBootstrapServers(brokers),
-      compressionType = compressionType)
-    try {
-      val futures = records.map(producer.send)
-      futures.foreach(_.get)
-      // advance the server time after each record sent to make sure the time changed when appendTime is used
-      mockTime.sleep(100)
-      val futures2 = records2.map(producer.send)
-      futures2.foreach(_.get)
-      mockTime.sleep(100)
-      val futures3 = records3.map(producer.send)
-      futures3.foreach(_.get)
-    } finally {
-      producer.close()
-    }
-  }
+  def generateConfigs: Seq[KafkaConfig] =
+    TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+}
 
-  def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props =>
-      if (setOldMessageFormat) {
-        props.setProperty("log.message.format.version", "0.10.0")
-        props.setProperty("inter.broker.protocol.version", "0.10.0")
-      }
-      props
-    }.map(KafkaConfig.fromProps)
-  }
-}
\ No newline at end of file
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index f2c3a717b7f..0a2cf270bf9 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -98,7 +98,7 @@ class KRaftQuorumImplementation(
   ): KafkaBroker = {
     val sharedServer = new SharedServer(config,
       new MetaProperties(clusterId, config.nodeId),
-      time,
+      Time.SYSTEM,
       new Metrics(),
       controllerQuorumVotersFuture,
       faultHandlerFactory)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 5ffb037aff8..6b781f6fa69 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -219,8 +219,8 @@ class LogValidatorTest {
       "MessageSet should still valid")
     assertEquals(now, validatedResults.maxTimestampMs,
       s"Max timestamp should be $now")
-    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"The offset of max timestamp should be 0 if logAppendTime is used")
+    assertEquals(records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"The offset of max timestamp should be ${records.records.asScala.size - 1}")
     assertTrue(validatedResults.messageSizeMaybeChanged,
       "Message size may have been changed")
 
@@ -271,8 +271,8 @@ class LogValidatorTest {
       "MessageSet should still valid")
     assertEquals(now, validatedResults.maxTimestampMs,
       s"Max timestamp should be $now")
-    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"The offset of max timestamp should be 0 if logAppendTime is used")
+    assertEquals(records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"The offset of max timestamp should be ${records.records.asScala.size - 1}")
     assertFalse(validatedResults.messageSizeMaybeChanged,
       "Message size should not have been changed")
 
@@ -341,7 +341,6 @@ class LogValidatorTest {
 
   private def checkNonCompressed(magic: Byte): Unit = {
     val now = System.currentTimeMillis()
-    // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
     val timestampSeq = Seq(now - 1, now + 1, now)
 
     val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
@@ -421,7 +420,6 @@ class LogValidatorTest {
 
   private def checkRecompression(magic: Byte): Unit = {
     val now = System.currentTimeMillis()
-    // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
     val timestampSeq = Seq(now - 1, now + 1, now)
 
     val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
@@ -475,8 +473,8 @@ class LogValidatorTest {
     }
     assertEquals(now + 1, validatingResults.maxTimestampMs,
       s"Max timestamp should be ${now + 1}")
-    assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
-      "Offset of max timestamp should be 1")
+    assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs,
+      "Offset of max timestamp should be 2")
     assertTrue(validatingResults.messageSizeMaybeChanged,
       "Message size should have been changed")
 
@@ -527,8 +525,8 @@ class LogValidatorTest {
     }
     assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
       s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
-    assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"Offset of max timestamp should be -1")
+    assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
     assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
 
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
@@ -574,8 +572,8 @@ class LogValidatorTest {
       assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
     }
     assertEquals(timestamp, validatedResults.maxTimestampMs)
-    assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.")
+    assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
     assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
 
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
@@ -589,7 +587,6 @@ class LogValidatorTest {
 
   private def checkCompressed(magic: Byte): Unit = {
     val now = System.currentTimeMillis()
-    // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
     val timestampSeq = Seq(now - 1, now + 1, now)
 
     val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
@@ -642,8 +639,8 @@ class LogValidatorTest {
       }
     }
     assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
-    assertEquals(1, validatedResults.shallowOffsetOfMaxTimestampMs,
-      s"Offset of max timestamp should be 1")
+    assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
+      s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
     assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
 
     verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index c8fd50e468d..a05512a0196 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -329,8 +329,6 @@ public class LogValidator {
         long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         LongRef expectedInnerOffset = PrimitiveRef.ofLong(0);
         List<Record> validatedRecords = new ArrayList<>();
-        long offsetOfMaxTimestamp = -1;
-        long initialOffset = offsetCounter.value;
 
         int uncompressedSizeInBytes = 0;
 
@@ -380,11 +378,8 @@ public class LogValidator {
                             && batch.magic() > RecordBatch.MAGIC_VALUE_V0
                             && toMagic > RecordBatch.MAGIC_VALUE_V0) {
 
-                        if (record.timestamp() > maxTimestamp) {
+                        if (record.timestamp() > maxTimestamp)
                             maxTimestamp = record.timestamp();
-                            // The offset is only increased when it is a valid record
-                            offsetOfMaxTimestamp = initialOffset + validatedRecords.size();
-                        }
 
                         // Some older clients do not implement the V1 internal offsets correctly.
                         // Historically the broker handled this by rewriting the batches rather
@@ -421,10 +416,8 @@ public class LogValidator {
             long lastOffset = offsetCounter.value - 1;
             firstBatch.setLastOffset(lastOffset);
 
-            if (timestampType == TimestampType.LOG_APPEND_TIME) {
+            if (timestampType == TimestampType.LOG_APPEND_TIME)
                 maxTimestamp = now;
-                offsetOfMaxTimestamp = initialOffset;
-            }
 
             if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
                 firstBatch.setMaxTimestamp(timestampType, maxTimestamp);
@@ -437,7 +430,7 @@ public class LogValidator {
                 now,
                 records,
                 maxTimestamp,
-                offsetOfMaxTimestamp,
+                lastOffset,
                 false,
                 recordConversionStats);
         }