You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/08 06:31:21 UTC

[kafka] branch trunk updated: KAFKA-13710: bring the InvalidTimestampException back for record error (#11853)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1848f04  KAFKA-13710: bring the InvalidTimestampException back for record error (#11853)
1848f04 is described below

commit 1848f049e106101293739f6471d3d589c4018e5e
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Tue Mar 8 14:28:16 2022 +0800

    KAFKA-13710: bring the InvalidTimestampException back for record error (#11853)
    
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Ricardo Brasil <an...@gmail.com>
---
 core/src/main/scala/kafka/log/LogValidator.scala          | 13 +++++++++----
 core/src/test/scala/unit/kafka/log/LogValidatorTest.scala |  9 ++++-----
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b40598f..0949c11 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException}
 import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
 import kafka.server.{BrokerTopicStats, RequestLocal}
 import kafka.utils.Logging
-import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
 import org.apache.kafka.common.InvalidRecordException
 import org.apache.kafka.common.TopicPartition
@@ -571,9 +571,14 @@ private[log] object LogValidator extends Logging {
   private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = {
     if (recordErrors.nonEmpty) {
       val errors = recordErrors.map(_.recordError)
-      throw new RecordValidationException(new InvalidRecordException(
-        "One or more records have been rejected due to " + errors.size + " record errors " +
-          "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
+      if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
+        throw new RecordValidationException(new InvalidTimestampException(
+          "One or more records have been rejected due to invalid timestamp"), errors)
+      } else {
+        throw new RecordValidationException(new InvalidRecordException(
+          "One or more records have been rejected due to " + errors.size + " record errors " +
+            "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
+      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 5676a8e..4275684 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -25,7 +25,7 @@ import kafka.message._
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server.{BrokerTopicStats, RequestLocal}
 import kafka.utils.TestUtils.meterCount
-import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
@@ -1352,7 +1352,7 @@ class LogValidatorTest {
         requestLocal = RequestLocal.withThreadConfinedCaching)
     )
 
-    assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
+    assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
     assertTrue(e.recordErrors.nonEmpty)
     assertEquals(e.recordErrors.size, 3)
   }
@@ -1397,9 +1397,8 @@ class LogValidatorTest {
         RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
     )
     // if there is a mix of both regular InvalidRecordException and InvalidTimestampException,
-    // InvalidTimestampException is no longer takes precedence. The type of invalidException
-    // is unified as InvalidRecordException
-    assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
+    // InvalidTimestampException takes precedence
+    assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
     assertTrue(e.recordErrors.nonEmpty)
     assertEquals(6, e.recordErrors.size)
   }