You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/02 00:35:34 UTC
[kafka] branch 2.4 updated: KAFKA-9080: Revert the check added to
validate non-compressed record batch does have continuous incremental
offsets
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new e49e0f7 KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets
e49e0f7 is described below
commit e49e0f7910cf268124b44c75cac3cb4460fb9fa0
Author: Tu Tran <tu...@confluent.io>
AuthorDate: Sat Nov 2 06:03:27 2019 +0530
KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets
#7167 added a check for non-incremental offsets in `assignOffsetsNonCompressed`, which is not applicable for message format V0 and V1. Therefore, I added a condition to disable the check if the record version precedes V2.
Author: Tu Tran <tu...@confluent.io>
Reviewers: Manikumar Reddy <ma...@gmail.com>
Closes #7628 from tuvtran/KAFKA-9080
---
core/src/main/scala/kafka/log/LogValidator.scala | 11 -----------
1 file changed, 11 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 70bf3bf..c4dda08 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -236,7 +236,6 @@ private[kafka] object LogValidator extends Logging {
magic: Byte,
brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
var maxTimestamp = RecordBatch.NO_TIMESTAMP
- val expectedInnerOffset = new LongRef(0)
var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value
@@ -251,16 +250,6 @@ private[kafka] object LogValidator extends Logging {
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
- val expectedOffset = expectedInnerOffset.getAndIncrement()
-
- // inner records offset should always be continuous
- if (record.offset != expectedOffset) {
- brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
- throw new RecordValidationException(
- new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."),
- List(new RecordError(batchIndex)))
- }
-
val offset = offsetCounter.getAndIncrement()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
maxBatchTimestamp = record.timestamp