You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/08 06:31:21 UTC
[kafka] branch trunk updated: KAFKA-13710: bring the InvalidTimestampException back for record error (#11853)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1848f04 KAFKA-13710: bring the InvalidTimestampException back for record error (#11853)
1848f04 is described below
commit 1848f049e106101293739f6471d3d589c4018e5e
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Tue Mar 8 14:28:16 2022 +0800
KAFKA-13710: bring the InvalidTimestampException back for record error (#11853)
Reviewers: Guozhang Wang <gu...@confluent.io>, Ricardo Brasil <an...@gmail.com>
---
core/src/main/scala/kafka/log/LogValidator.scala | 13 +++++++++----
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala | 9 ++++-----
2 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index b40598f..0949c11 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException}
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.Logging
-import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.TopicPartition
@@ -571,9 +571,14 @@ private[log] object LogValidator extends Logging {
private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = {
if (recordErrors.nonEmpty) {
val errors = recordErrors.map(_.recordError)
- throw new RecordValidationException(new InvalidRecordException(
- "One or more records have been rejected due to " + errors.size + " record errors " +
- "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
+ if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
+ throw new RecordValidationException(new InvalidTimestampException(
+ "One or more records have been rejected due to invalid timestamp"), errors)
+ } else {
+ throw new RecordValidationException(new InvalidRecordException(
+ "One or more records have been rejected due to " + errors.size + " record errors " +
+ "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 5676a8e..4275684 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -25,7 +25,7 @@ import kafka.message._
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount
-import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
@@ -1352,7 +1352,7 @@ class LogValidatorTest {
requestLocal = RequestLocal.withThreadConfinedCaching)
)
- assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
+ assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 3)
}
@@ -1397,9 +1397,8 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
)
// if there is a mix of both regular InvalidRecordException and InvalidTimestampException,
- // InvalidTimestampException is no longer takes precedence. The type of invalidException
- // is unified as InvalidRecordException
- assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
+ // InvalidTimestampException takes precedence
+ assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(6, e.recordErrors.size)
}