You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/09 16:50:43 UTC
[kafka] branch trunk updated: MINOR: Fix record conversion time in
metrics (#4671)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ef2fb8 MINOR: Fix record conversion time in metrics (#4671)
3ef2fb8 is described below
commit 3ef2fb843e873a4597b1d1d068144102271d3002
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 9 16:50:34 2018 +0000
MINOR: Fix record conversion time in metrics (#4671)
Reviewers: Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/log/LogValidator.scala | 3 +-
.../scala/unit/kafka/log/LogValidatorTest.scala | 33 ++++++++++++++++------
2 files changed, 26 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 1beb2bd..6515260 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -130,6 +130,7 @@ private[kafka] object LogValidator extends Logging {
toMagicValue: Byte,
partitionLeaderEpoch: Int,
isFromClient: Boolean): ValidationAndOffsetAssignResult = {
+ val startNanos = time.nanoseconds
val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
CompressionType.NONE, records.records)
@@ -155,7 +156,7 @@ private[kafka] object LogValidator extends Logging {
val info = builder.info
val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
- builder.numRecords, time.nanoseconds - now)
+ builder.numRecords, time.nanoseconds - startNanos)
ValidationAndOffsetAssignResult(
validatedRecords = convertedRecords,
maxTimestamp = info.maxTimestamp,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 04e89528..f68ff9e 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -17,6 +17,7 @@
package kafka.log
import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
import kafka.common.LongRef
import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
@@ -683,7 +684,7 @@ class LogValidatorTest {
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
checkOffsets(records, 0)
val offset = 1234567
- checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
time = time,
now = System.currentTimeMillis(),
@@ -694,7 +695,10 @@ class LogValidatorTest {
timestampType = TimestampType.LOG_APPEND_TIME,
timestampDiffMaxMs = 1000L,
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
- isFromClient = true).validatedRecords, offset)
+ isFromClient = true)
+ checkOffsets(validatedResults.validatedRecords, offset)
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ compressed = false)
}
@Test
@@ -702,7 +706,7 @@ class LogValidatorTest {
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
checkOffsets(records, 0)
val offset = 1234567
- checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
time = time,
now = System.currentTimeMillis(),
@@ -713,7 +717,10 @@ class LogValidatorTest {
timestampType = TimestampType.LOG_APPEND_TIME,
timestampDiffMaxMs = 1000L,
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
- isFromClient = true).validatedRecords, offset)
+ isFromClient = true)
+ checkOffsets(validatedResults.validatedRecords, offset)
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ compressed = false)
}
@Test
@@ -721,7 +728,7 @@ class LogValidatorTest {
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
val offset = 1234567
checkOffsets(records, 0)
- checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
time = time,
now = System.currentTimeMillis(),
@@ -732,7 +739,10 @@ class LogValidatorTest {
timestampType = TimestampType.LOG_APPEND_TIME,
timestampDiffMaxMs = 1000L,
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
- isFromClient = true).validatedRecords, offset)
+ isFromClient = true)
+ checkOffsets(validatedResults.validatedRecords, offset)
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ compressed = true)
}
@Test
@@ -740,7 +750,7 @@ class LogValidatorTest {
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
val offset = 1234567
checkOffsets(records, 0)
- checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
time = time,
now = System.currentTimeMillis(),
@@ -751,7 +761,10 @@ class LogValidatorTest {
timestampType = TimestampType.LOG_APPEND_TIME,
timestampDiffMaxMs = 1000L,
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
- isFromClient = true).validatedRecords, offset)
+ isFromClient = true)
+ checkOffsets(validatedResults.validatedRecords, offset)
+ verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ compressed = true)
}
@Test(expected = classOf[InvalidRecordException])
@@ -1122,8 +1135,10 @@ class LogValidatorTest {
compressed: Boolean): Unit = {
assertNotNull("Records processing info is null", stats)
assertEquals(numConvertedRecords, stats.numRecordsConverted)
- if (numConvertedRecords > 0)
+ if (numConvertedRecords > 0) {
assertTrue(s"Conversion time not recorded $stats", stats.conversionTimeNanos >= 0)
+ assertTrue(s"Conversion time not valid $stats", stats.conversionTimeNanos <= TimeUnit.MINUTES.toNanos(1))
+ }
val originalSize = records.sizeInBytes
val tempBytes = stats.temporaryMemoryBytes
if (numConvertedRecords > 0 && compressed)
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.