You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/12 19:11:33 UTC

[kafka] branch 2.5 updated: KAFKA-9669; Loosen validation of inner offsets for older message formats (#8647)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 7624e62  KAFKA-9669; Loosen validation of inner offsets for older message formats (#8647)
7624e62 is described below

commit 7624e6247984223901aa34d7b7c2789c3e1d0c3f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 12 12:06:08 2020 -0700

    KAFKA-9669; Loosen validation of inner offsets for older message formats (#8647)
    
    Prior to KAFKA-8106, we allowed the v0 and v1 message formats to contain non-consecutive inner offsets. Inside `LogValidator`, we would detect this case and rewrite the batch. After KAFKA-8106, we changed the logic to raise an error in the case of the v1 message format (v0 was still expected to be rewritten). This caused an incompatibility for older clients which were depending on the looser validation. This patch reverts the old logic of rewriting the batch to fix the invalid inner offsets.
    
    Note that the v2 message format has always had stricter validation. This patch also adds a test case for this.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, Ismael Juma <is...@juma.me.uk>
---
 .../kafka/common/record/MemoryRecordsBuilder.java  | 30 +++++++++++++
 core/src/main/scala/kafka/log/LogValidator.scala   | 21 ++++------
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 49 ++++++++++++++++++++--
 3 files changed, 85 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 054fb86..fd5a680 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -588,6 +589,35 @@ public class MemoryRecordsBuilder implements AutoCloseable {
     }
 
     /**
+     * Append a record without doing offset/magic validation (this should only be used in testing).
+     *
+     * @param offset The offset of the record
+     * @param record The record to add
+     */
+    public void appendUncheckedWithOffset(long offset, SimpleRecord record) throws IOException {
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            int offsetDelta = (int) (offset - baseOffset);
+            long timestamp = record.timestamp();
+            if (firstTimestamp == null)
+                firstTimestamp = timestamp;
+
+            int sizeInBytes = DefaultRecord.writeTo(appendStream,
+                offsetDelta,
+                timestamp - firstTimestamp,
+                record.key(),
+                record.value(),
+                record.headers());
+            recordWritten(offset, timestamp, sizeInBytes);
+        } else {
+            LegacyRecord legacyRecord = LegacyRecord.create(magic,
+                record.timestamp(),
+                Utils.toNullableArray(record.key()),
+                Utils.toNullableArray(record.value()));
+            appendUncheckedWithOffset(offset, legacyRecord);
+        }
+    }
+
+    /**
      * Append a record at the next sequential offset.
      * @param record the record to add
      */
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 1291624..1da6683 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -366,16 +366,6 @@ private[log] object LogValidator extends Logging {
       else None
     }
 
-    def validateOffset(batchIndex: Int, record: Record, expectedOffset: Long): Option[ApiRecordError] = {
-      // inner records offset should always be continuous
-      if (record.offset != expectedOffset) {
-        brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-        Some(ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex,
-          s"Inner record $record inside the compressed record batch does not have " +
-            s"incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition.")))
-      } else None
-    }
-
     // No in place assignment situation 1
     var inPlaceAssignment = sourceCodec == targetCodec
 
@@ -422,8 +412,15 @@ private[log] object LogValidator extends Logging {
               if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
                 if (record.timestamp > maxTimestamp)
                   maxTimestamp = record.timestamp
-                validateOffset(batchIndex, record, expectedOffset)
-              } else None
+
+                // Some older clients do not implement the V1 internal offsets correctly.
+                // Historically the broker handled this by rewriting the batches rather
+                // than rejecting the request. We must continue this handling here to avoid
+                // breaking these clients.
+                if (record.offset != expectedOffset)
+                  inPlaceAssignment = false
+              }
+              None
             }
           }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 3dc4374..48bee7f 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit
 import com.yammer.metrics.Metrics
 import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
 import kafka.common.{LongRef, RecordValidationException}
+import kafka.log.LogValidator.ValidationAndOffsetAssignResult
 import kafka.message._
 import kafka.server.BrokerTopicStats
 import kafka.utils.TestUtils.meterCount
-import org.apache.kafka.common.InvalidRecordException
-import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
@@ -65,6 +65,29 @@ class LogValidatorTest {
   }
 
   @Test
+  def testValidationOfBatchesWithNonSequentialInnerOffsets(): Unit = {
+    def testMessageValidation(magicValue: Byte): Unit = {
+      val numRecords = 20
+      val invalidRecords = recordsWithNonSequentialInnerOffsets(magicValue, CompressionType.GZIP, numRecords)
+
+      // Validation for v2 and above is strict for this case. For older formats, we fix invalid
+      // internal offsets by rewriting the batch.
+      if (magicValue >= RecordBatch.MAGIC_VALUE_V2) {
+        assertThrows[InvalidRecordException] {
+          validateMessages(invalidRecords, magicValue, CompressionType.GZIP, CompressionType.GZIP)
+        }
+      } else {
+        val result = validateMessages(invalidRecords, magicValue, CompressionType.GZIP, CompressionType.GZIP)
+        assertEquals(0 until numRecords, result.validatedRecords.records.asScala.map(_.offset))
+      }
+    }
+
+    for (version <- RecordVersion.values) {
+      testMessageValidation(version.value)
+    }
+  }
+
+  @Test
   def testMisMatchMagic(): Unit = {
     checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
     checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
@@ -88,7 +111,10 @@ class LogValidatorTest {
     assertTrue(meterCount(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}") > 0)
   }
 
-  private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = {
+  private def validateMessages(records: MemoryRecords,
+                               magic: Byte,
+                               sourceCompressionType: CompressionType,
+                               targetCompressionType: CompressionType): ValidationAndOffsetAssignResult = {
     LogValidator.validateMessagesAndAssignOffsets(records,
       topicPartition,
       new LongRef(0L),
@@ -1401,6 +1427,23 @@ class LogValidatorTest {
     }
   }
 
+  private def recordsWithNonSequentialInnerOffsets(magicValue: Byte,
+                                                   codec: CompressionType,
+                                                   numRecords: Int): MemoryRecords = {
+    val records = (0 until numRecords).map { id =>
+      new SimpleRecord(id.toString.getBytes)
+    }
+
+    val buffer = ByteBuffer.allocate(1024)
+    val builder = MemoryRecords.builder(buffer, magicValue, codec, TimestampType.CREATE_TIME, 0L)
+
+    records.foreach { record =>
+      builder.appendUncheckedWithOffset(0, record)
+    }
+
+    builder.build()
+  }
+
   private def recordsWithInvalidInnerMagic(batchMagicValue: Byte,
                                            recordMagicValue: Byte,
                                            codec: CompressionType): MemoryRecords = {