You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/16 17:06:04 UTC

[kafka] branch trunk updated: KAFKA-9617 Replica Fetcher can mark partition as failed when max.message.bytes is changed (#8659)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78e18b5  KAFKA-9617 Replica Fetcher can mark partition as failed when max.message.bytes is changed (#8659)
78e18b5 is described below

commit 78e18b575c6b4724afe6107371d70c46fb57d3d9
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Sun May 17 01:05:17 2020 +0800

    KAFKA-9617 Replica Fetcher can mark partition as failed when max.message.bytes is changed (#8659)
    
    Skip to check the size of record if the record is already accepted by leader.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala          | 22 ++++++++++++++--------
 core/src/test/scala/unit/kafka/log/LogTest.scala | 16 ++++++++++++++++
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 14e414c..d046368 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1010,7 +1010,7 @@ class Log(@volatile private var _dir: File,
                      leaderEpoch: Int,
                      origin: AppendOrigin = AppendOrigin.Client,
                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
-    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
+    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false)
   }
 
   /**
@@ -1025,7 +1025,9 @@ class Log(@volatile private var _dir: File,
       origin = AppendOrigin.Replication,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       assignOffsets = false,
-      leaderEpoch = -1)
+      leaderEpoch = -1,
+      // disable to check the validation of record size since the record is already accepted by leader.
+      ignoreRecordSize = true)
   }
 
   /**
@@ -1039,6 +1041,7 @@ class Log(@volatile private var _dir: File,
    * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
+   * @param ignoreRecordSize true to skip validation of record size.
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
    * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
@@ -1048,9 +1051,10 @@ class Log(@volatile private var _dir: File,
                      origin: AppendOrigin,
                      interBrokerProtocolVersion: ApiVersion,
                      assignOffsets: Boolean,
-                     leaderEpoch: Int): LogAppendInfo = {
+                     leaderEpoch: Int,
+                     ignoreRecordSize: Boolean): LogAppendInfo = {
     maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
-      val appendInfo = analyzeAndValidateRecords(records, origin)
+      val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize)
 
       // return if we have no valid messages or if this is a duplicate of the last appended entry
       if (appendInfo.shallowCount == 0)
@@ -1097,7 +1101,7 @@ class Log(@volatile private var _dir: File,
 
           // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
           // format conversion)
-          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
+          if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
             for (batch <- validRecords.batches.asScala) {
               if (batch.sizeInBytes > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
@@ -1321,7 +1325,7 @@ class Log(@volatile private var _dir: File,
    * Validate the following:
    * <ol>
    * <li> each message matches its CRC
-   * <li> each message size is valid
+   * <li> each message size is valid (if ignoreRecordSize is false)
    * <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other.
    * </ol>
    *
@@ -1335,7 +1339,9 @@ class Log(@volatile private var _dir: File,
    * <li> Whether any compression codec is used (if many are used, then the last one is given)
    * </ol>
    */
-  private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = {
+  private def analyzeAndValidateRecords(records: MemoryRecords,
+                                        origin: AppendOrigin,
+                                        ignoreRecordSize: Boolean): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
     var firstOffset: Option[Long] = None
@@ -1375,7 +1381,7 @@ class Log(@volatile private var _dir: File,
 
       // Check if the message sizes are valid.
       val batchSize = batch.sizeInBytes
-      if (batchSize > config.maxMessageSize) {
+      if (!ignoreRecordSize && batchSize > config.maxMessageSize) {
         brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
         brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
         throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 41dc7e9..94238d8 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2078,6 +2078,22 @@ class LogTest {
       case _: RecordTooLargeException => // this is good
     }
   }
+
+  @Test
+  def testMessageSizeCheckInAppendAsFollower(): Unit = {
+    val first = MemoryRecords.withRecords(0, CompressionType.NONE, 0,
+      new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
+    val second = MemoryRecords.withRecords(5, CompressionType.NONE, 0,
+      new SimpleRecord("change (I need more bytes)... blah blah blah.".getBytes),
+      new SimpleRecord("More padding boo hoo".getBytes))
+
+    val log = createLog(logDir, LogTest.createLogConfig(maxMessageBytes = second.sizeInBytes - 1))
+
+    log.appendAsFollower(first)
+    // the second record is larger then limit but appendAsFollower does not validate the size.
+    log.appendAsFollower(second)
+  }
+
   /**
    * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
    */