You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2024/03/23 03:12:34 UTC
(kafka) branch 3.6 updated: KAFKA-16341 fix the LogValidator for non-compressed type (#15570)
This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 95c14f868a0 KAFKA-16341 fix the LogValidator for non-compressed type (#15570)
95c14f868a0 is described below
commit 95c14f868a03d652fb5b8d22e5008dfd2db09d4a
Author: Johnny Hsu <44...@users.noreply.github.com>
AuthorDate: Sat Mar 23 11:12:27 2024 +0800
KAFKA-16341 fix the LogValidator for non-compressed type (#15570)
- Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp.
- rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp
Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
.../apache/kafka/common/record/MemoryRecords.java | 24 ++++++------
.../kafka/common/record/MemoryRecordsBuilder.java | 6 +--
.../common/record/MemoryRecordsBuilderTest.java | 10 ++---
.../kafka/common/record/MemoryRecordsTest.java | 6 +--
core/src/main/scala/kafka/log/LocalLog.scala | 5 +--
core/src/main/scala/kafka/log/LogCleaner.scala | 5 +--
core/src/main/scala/kafka/log/UnifiedLog.scala | 2 +-
.../kafka/admin/ListOffsetsIntegrationTest.scala | 45 +++++++++++++++-------
.../test/scala/unit/kafka/log/LocalLogTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 4 +-
.../scala/unit/kafka/log/LogValidatorTest.scala | 24 +++++++-----
.../kafka/storage/internals/log/LogValidator.java | 19 ++++-----
12 files changed, 84 insertions(+), 68 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 888bcdf2cb1..d20dd3469c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -211,8 +211,8 @@ public class MemoryRecords extends AbstractRecords {
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
MemoryRecordsBuilder.RecordsInfo info = builder.info();
- filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
- maxOffset, retainedRecords.size(), filteredBatchSize);
+ filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.offsetOfMaxTimestamp,
+ maxOffset, retainedRecords.size(), filteredBatchSize);
}
} else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
@@ -400,7 +400,7 @@ public class MemoryRecords extends AbstractRecords {
private int bytesRetained = 0;
private long maxOffset = -1L;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
- private long shallowOffsetOfMaxTimestamp = -1L;
+ private long offsetOfMaxTimestamp = -1L;
private FilterResult(ByteBuffer outputBuffer) {
this.outputBuffer = outputBuffer;
@@ -412,21 +412,21 @@ public class MemoryRecords extends AbstractRecords {
retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained);
}
- private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset,
- int messagesRetained, int bytesRetained) {
- validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset);
+ private void updateRetainedBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset,
+ int messagesRetained, int bytesRetained) {
+ validateBatchMetadata(maxTimestamp, offsetOfMaxTimestamp, maxOffset);
if (maxTimestamp > this.maxTimestamp) {
this.maxTimestamp = maxTimestamp;
- this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+ this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
}
this.maxOffset = Math.max(maxOffset, this.maxOffset);
this.messagesRetained += messagesRetained;
this.bytesRetained += bytesRetained;
}
- private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) {
- if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0)
- throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp);
+ private void validateBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset) {
+ if (maxTimestamp != RecordBatch.NO_TIMESTAMP && offsetOfMaxTimestamp < 0)
+ throw new IllegalArgumentException("offset undefined for maximum timestamp " + maxTimestamp);
if (maxOffset < 0)
throw new IllegalArgumentException("maxOffset undefined");
}
@@ -459,8 +459,8 @@ public class MemoryRecords extends AbstractRecords {
return maxTimestamp;
}
- public long shallowOffsetOfMaxTimestamp() {
- return shallowOffsetOfMaxTimestamp;
+ public long offsetOfMaxTimestamp() {
+ return offsetOfMaxTimestamp;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index de03030d82e..2d5c008f02a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -851,12 +851,12 @@ public class MemoryRecordsBuilder implements AutoCloseable {
public static class RecordsInfo {
public final long maxTimestamp;
- public final long shallowOffsetOfMaxTimestamp;
+ public final long offsetOfMaxTimestamp;
public RecordsInfo(long maxTimestamp,
- long shallowOffsetOfMaxTimestamp) {
+ long offsetOfMaxTimestamp) {
this.maxTimestamp = maxTimestamp;
- this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+ this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 0923baea01e..1c27b7d65b9 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -379,7 +379,7 @@ public class MemoryRecordsBuilderTest {
assertEquals(logAppendTime, info.maxTimestamp);
// When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp
- assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
+ assertEquals(0L, info.offsetOfMaxTimestamp);
for (RecordBatch batch : records.batches()) {
if (magic == MAGIC_VALUE_V0) {
@@ -415,9 +415,9 @@ public class MemoryRecordsBuilderTest {
if (magic == MAGIC_VALUE_V0)
// in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1.
- assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
+ assertEquals(-1L, info.offsetOfMaxTimestamp);
else
- assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
+ assertEquals(1L, info.offsetOfMaxTimestamp);
int i = 0;
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
@@ -496,10 +496,10 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder.RecordsInfo info = builder.info();
if (magic == MAGIC_VALUE_V0) {
assertEquals(-1, info.maxTimestamp);
- assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
+ assertEquals(-1L, info.offsetOfMaxTimestamp);
} else {
assertEquals(2L, info.maxTimestamp);
- assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+ assertEquals(2L, info.offsetOfMaxTimestamp);
}
long i = 0L;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 50821af841c..9e688fc3ab6 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -352,7 +352,7 @@ public class MemoryRecordsTest {
assertEquals(0, filterResult.messagesRetained());
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
assertEquals(12, filterResult.maxTimestamp());
- assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp());
+ assertEquals(baseOffset + 1, filterResult.offsetOfMaxTimestamp());
// Verify filtered records
filtered.flip();
@@ -413,7 +413,7 @@ public class MemoryRecordsTest {
assertEquals(0, filterResult.messagesRetained());
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
assertEquals(timestamp, filterResult.maxTimestamp());
- assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp());
+ assertEquals(baseOffset, filterResult.offsetOfMaxTimestamp());
assertTrue(filterResult.outputBuffer().position() > 0);
// Verify filtered records
@@ -893,7 +893,7 @@ public class MemoryRecordsTest {
assertEquals(filtered.limit(), result.bytesRetained());
if (magic > RecordBatch.MAGIC_VALUE_V0) {
assertEquals(20L, result.maxTimestamp());
- assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
+ assertEquals(4L, result.offsetOfMaxTimestamp());
}
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index aaaeb8787e9..f500766b727 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -434,9 +434,8 @@ class LocalLog(@volatile private var _dir: File,
}
}
- private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
- segments.activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
- shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)
+ private[log] def append(lastOffset: Long, largestTimestamp: Long, offsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
+ segments.activeSegment.append(lastOffset, largestTimestamp, offsetOfMaxTimestamp, records)
updateLogEndOffset(lastOffset + 1)
}
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index ff8a687b5ee..c013e0a033a 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -812,10 +812,7 @@ private[log] class Cleaner(val id: Int,
val retained = MemoryRecords.readableRecords(outputBuffer)
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
- dest.append(largestOffset = result.maxOffset,
- largestTimestamp = result.maxTimestamp,
- shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
- records = retained)
+ dest.append(result.maxOffset, result.maxTimestamp, result.offsetOfMaxTimestamp, retained)
throttler.maybeThrottle(outputBuffer.limit())
}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index e0734da738a..626196a9cd4 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -813,7 +813,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
- appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs)
+ appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs)
appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordConversionStats(validateAndOffsetAssignResult.recordConversionStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index ad1718c6a43..76753cfdc48 100644
--- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -68,9 +68,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
verifyListOffsets()
// test LogAppendTime case
- val props: Properties = new Properties()
- props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
- createTopicWithConfig(topicNameWithCustomConfigs, props)
+ setUpForLogAppendTimeCase()
produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
// So in this one batch test, it'll be the first offset 0
@@ -79,9 +77,30 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
- def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+ def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+ produceMessagesInOneBatch()
+ verifyListOffsets()
+
+ // test LogAppendTime case
+ setUpForLogAppendTimeCase()
+ produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+ // In LogAppendTime's case, if the timestamps are the same, we choose the offset of the first record
+ // thus, the maxTimestampOffset should be the first record of the batch.
+ // So in this one batch test, it'll be the first offset which is 0
+ verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testThreeNonCompressedRecordsInSeparateBatch(quorum: String): Unit = {
produceMessagesInSeparateBatch()
verifyListOffsets()
+
+ // test LogAppendTime case
+ setUpForLogAppendTimeCase()
+ produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
+ // In LogAppendTime's case, if the timestamp is different, it should be the last one
+ verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
}
// The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
@@ -93,9 +112,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
verifyListOffsets()
// test LogAppendTime case
- val props: Properties = new Properties()
- props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
- createTopicWithConfig(topicNameWithCustomConfigs, props)
+ setUpForLogAppendTimeCase()
produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
// So in this one batch test, it'll be the first offset 0
@@ -111,9 +128,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
verifyListOffsets()
// test LogAppendTime case
- val props: Properties = new Properties()
- props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
- createTopicWithConfig(topicNameWithCustomConfigs, props)
+ setUpForLogAppendTimeCase()
produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
// So in this separate batch test, it'll be the last offset 2
@@ -147,15 +162,19 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
verifyListOffsets()
// test LogAppendTime case
- val props: Properties = new Properties()
- props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
- createTopicWithConfig(topicNameWithCustomConfigs, props)
+ setUpForLogAppendTimeCase()
produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
// So in this separate batch test, it'll be the last offset 2
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
}
+ private def setUpForLogAppendTimeCase(): Unit = {
+ val props: Properties = new Properties()
+ props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
+ createTopicWithConfig(topicNameWithCustomConfigs, props)
+ }
+
private def createOldMessageFormatBrokers(): Unit = {
setOldMessageFormat = true
recreateBrokers(reconfigure = true, startup = true)
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index 8556bc74fa5..fd8884093ab 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -100,7 +100,7 @@ class LocalLogTest {
initialOffset: Long = 0L): Unit = {
log.append(lastOffset = initialOffset + records.size - 1,
largestTimestamp = records.head.timestamp,
- shallowOffsetOfMaxTimestamp = initialOffset,
+ offsetOfMaxTimestamp = initialOffset,
records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*))
}
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 65088b7dae3..86525e761b0 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -86,10 +86,10 @@ class LogSegmentTest {
def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = {
val seg = createSegment(baseOffset)
val currentTime = Time.SYSTEM.milliseconds()
- val shallowOffsetOfMaxTimestamp = largestOffset
+ val offsetOfMaxTimestamp = largestOffset
val memoryRecords = records(0, "hello")
assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
- seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords)
+ seg.append(largestOffset, currentTime, offsetOfMaxTimestamp, memoryRecords)
})
}
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 5ffb037aff8..6f59013a05a 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -173,9 +173,9 @@ class LogValidatorTest {
assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now")
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
- // we index from last offset in version 2 instead of base offset
- val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0
- assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs,
+ // If it's LOG_APPEND_TIME, the offset will be the offset of the first record
+ val expectedMaxTimestampOffset = 0
+ assertEquals(expectedMaxTimestampOffset, validatedResults.offsetOfMaxTimestampMs,
s"The offset of max timestamp should be $expectedMaxTimestampOffset")
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
compressed = false)
@@ -219,7 +219,7 @@ class LogValidatorTest {
"MessageSet should still valid")
assertEquals(now, validatedResults.maxTimestampMs,
s"Max timestamp should be $now")
- assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+ assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
s"The offset of max timestamp should be 0 if logAppendTime is used")
assertTrue(validatedResults.messageSizeMaybeChanged,
"Message size may have been changed")
@@ -271,7 +271,7 @@ class LogValidatorTest {
"MessageSet should still valid")
assertEquals(now, validatedResults.maxTimestampMs,
s"Max timestamp should be $now")
- assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+ assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
s"The offset of max timestamp should be 0 if logAppendTime is used")
assertFalse(validatedResults.messageSizeMaybeChanged,
"Message size should not have been changed")
@@ -400,7 +400,9 @@ class LogValidatorTest {
assertEquals(i, offsetCounter.value);
assertEquals(now + 1, validatingResults.maxTimestampMs,
s"Max timestamp should be ${now + 1}")
- assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
+
+ val expectedOffsetOfMaxTimestamp = 1
+ assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs,
s"Offset of max timestamp should be 1")
assertFalse(validatingResults.messageSizeMaybeChanged,
"Message size should not have been changed")
@@ -475,7 +477,7 @@ class LogValidatorTest {
}
assertEquals(now + 1, validatingResults.maxTimestampMs,
s"Max timestamp should be ${now + 1}")
- assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
+ assertEquals(1, validatingResults.offsetOfMaxTimestampMs,
"Offset of max timestamp should be 1")
assertTrue(validatingResults.messageSizeMaybeChanged,
"Message size should have been changed")
@@ -527,7 +529,7 @@ class LogValidatorTest {
}
assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
- assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs,
+ assertEquals(-1, validatedResults.offsetOfMaxTimestampMs,
s"Offset of max timestamp should be -1")
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
@@ -574,7 +576,7 @@ class LogValidatorTest {
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
}
assertEquals(timestamp, validatedResults.maxTimestampMs)
- assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
+ assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.")
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
@@ -642,7 +644,9 @@ class LogValidatorTest {
}
}
assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
- assertEquals(1, validatedResults.shallowOffsetOfMaxTimestampMs,
+
+ val expectedOffsetOfMaxTimestamp = 1
+ assertEquals(expectedOffsetOfMaxTimestamp, validatedResults.offsetOfMaxTimestampMs,
s"Offset of max timestamp should be 1")
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
index c8fd50e468d..2dcc3fa1035 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java
@@ -68,17 +68,17 @@ public class LogValidator {
public final long logAppendTimeMs;
public final MemoryRecords validatedRecords;
public final long maxTimestampMs;
- public final long shallowOffsetOfMaxTimestampMs;
+ public final long offsetOfMaxTimestampMs;
public final boolean messageSizeMaybeChanged;
public final RecordConversionStats recordConversionStats;
public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs,
- long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged,
- RecordConversionStats recordConversionStats) {
+ long offsetOfMaxTimestampMs, boolean messageSizeMaybeChanged,
+ RecordConversionStats recordConversionStats) {
this.logAppendTimeMs = logAppendTimeMs;
this.validatedRecords = validatedRecords;
this.maxTimestampMs = maxTimestampMs;
- this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs;
+ this.offsetOfMaxTimestampMs = offsetOfMaxTimestampMs;
this.messageSizeMaybeChanged = messageSizeMaybeChanged;
this.recordConversionStats = recordConversionStats;
}
@@ -149,7 +149,7 @@ public class LogValidator {
* avoid expensive re-compression.
*
* Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
- * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
+ * of the message with the max timestamp and a boolean indicating whether the message sizes may have changed.
*/
public ValidationResult validateMessagesAndAssignOffsets(PrimitiveRef.LongRef offsetCounter,
MetricsRecorder metricsRecorder,
@@ -232,7 +232,7 @@ public class LogValidator {
now,
convertedRecords,
info.maxTimestamp,
- info.shallowOffsetOfMaxTimestamp,
+ info.offsetOfMaxTimestamp,
true,
recordConversionStats);
}
@@ -293,10 +293,7 @@ public class LogValidator {
if (timestampType == TimestampType.LOG_APPEND_TIME) {
maxTimestamp = now;
- if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
- offsetOfMaxTimestamp = offsetCounter.value - 1;
- else
- offsetOfMaxTimestamp = initialOffset;
+ offsetOfMaxTimestamp = initialOffset;
}
return new ValidationResult(
@@ -479,7 +476,7 @@ public class LogValidator {
logAppendTime,
records,
info.maxTimestamp,
- info.shallowOffsetOfMaxTimestamp,
+ info.offsetOfMaxTimestamp,
true,
recordConversionStats);
}