You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/02 00:35:34 UTC

[kafka] branch 2.4 updated: KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new e49e0f7  KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets
e49e0f7 is described below

commit e49e0f7910cf268124b44c75cac3cb4460fb9fa0
Author: Tu Tran <tu...@confluent.io>
AuthorDate: Sat Nov 2 06:03:27 2019 +0530

    KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets
    
    #7167 added a check for non-incremental offsets in `assignOffsetsNonCompressed`, which is not applicable for message format V0 and V1. Therefore, I added a condition to disable the check if the record version precedes V2.
    
    Author: Tu Tran <tu...@confluent.io>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
    
    Closes #7628 from tuvtran/KAFKA-9080
---
 core/src/main/scala/kafka/log/LogValidator.scala | 11 -----------
 1 file changed, 11 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 70bf3bf..c4dda08 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -236,7 +236,6 @@ private[kafka] object LogValidator extends Logging {
                                          magic: Byte,
                                          brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
-    val expectedInnerOffset = new LongRef(0)
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
 
@@ -251,16 +250,6 @@ private[kafka] object LogValidator extends Logging {
       for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
         validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
 
-        val expectedOffset = expectedInnerOffset.getAndIncrement()
-
-        // inner records offset should always be continuous
-        if (record.offset != expectedOffset) {
-          brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-          throw new RecordValidationException(
-            new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."),
-            List(new RecordError(batchIndex)))
-        }
-
         val offset = offsetCounter.getAndIncrement()
         if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
           maxBatchTimestamp = record.timestamp