You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/09 17:17:59 UTC

[kafka] branch 1.1 updated: MINOR: Fix record conversion time in metrics (#4671)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 1bae9f4  MINOR: Fix record conversion time in metrics (#4671)
1bae9f4 is described below

commit 1bae9f44f90ff9b9d8c6c19f0fef5f1e95843d84
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 9 16:50:34 2018 +0000

    MINOR: Fix record conversion time in metrics (#4671)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/LogValidator.scala   |  3 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 33 ++++++++++++++++------
 2 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 1beb2bd..6515260 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -130,6 +130,7 @@ private[kafka] object LogValidator extends Logging {
                                                    toMagicValue: Byte,
                                                    partitionLeaderEpoch: Int,
                                                    isFromClient: Boolean): ValidationAndOffsetAssignResult = {
+    val startNanos = time.nanoseconds
     val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
       CompressionType.NONE, records.records)
 
@@ -155,7 +156,7 @@ private[kafka] object LogValidator extends Logging {
 
     val info = builder.info
     val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
-      builder.numRecords, time.nanoseconds - now)
+      builder.numRecords, time.nanoseconds - startNanos)
     ValidationAndOffsetAssignResult(
       validatedRecords = convertedRecords,
       maxTimestamp = info.maxTimestamp,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 04e89528..f68ff9e 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -17,6 +17,7 @@
 package kafka.log
 
 import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
 
 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
@@ -683,7 +684,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
     checkOffsets(records, 0)
     val offset = 1234567
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -694,7 +695,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+      compressed = false)
   }
 
   @Test
@@ -702,7 +706,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
     checkOffsets(records, 0)
     val offset = 1234567
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -713,7 +717,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+      compressed = false)
   }
 
   @Test
@@ -721,7 +728,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val offset = 1234567
     checkOffsets(records, 0)
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -732,7 +739,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+      compressed = true)
   }
 
   @Test
@@ -740,7 +750,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val offset = 1234567
     checkOffsets(records, 0)
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -751,7 +761,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+      compressed = true)
   }
 
   @Test(expected = classOf[InvalidRecordException])
@@ -1122,8 +1135,10 @@ class LogValidatorTest {
                                    compressed: Boolean): Unit = {
     assertNotNull("Records processing info is null", stats)
     assertEquals(numConvertedRecords, stats.numRecordsConverted)
-    if (numConvertedRecords > 0)
+    if (numConvertedRecords > 0) {
       assertTrue(s"Conversion time not recorded $stats", stats.conversionTimeNanos >= 0)
+      assertTrue(s"Conversion time not valid $stats", stats.conversionTimeNanos <= TimeUnit.MINUTES.toNanos(1))
+    }
     val originalSize = records.sizeInBytes
     val tempBytes = stats.temporaryMemoryBytes
     if (numConvertedRecords > 0 && compressed)

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.