You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/22 15:34:07 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

chia7712 commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492834735



##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec)
 
-    for (batch <- records.batches.asScala) {
+    records.batches.forEach { batch =>
       validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, brokerTopicStats)
 
       val recordErrors = new ArrayBuffer[ApiRecordError](0)
-      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+      var batchIndex = 0
+      batch.forEach { record =>
         validateRecord(batch, topicPartition, record, batchIndex, now, timestampType,
           timestampDiffMaxMs, compactedTopic, brokerTopicStats).foreach(recordError => recordErrors += recordError)
         // we fail the batch if any record fails, so we stop appending if any record fails
-        if (recordErrors.isEmpty)
-          builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+        if (recordErrors.isEmpty) builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+        batchIndex += 1

Review comment:
       > Have we benchmarked this path?
   
   I didn't benchmark this path and you are right that optimization is small as we have to convert data in this path. I will revert it to make small patch. 

##########
File path: core/src/main/scala/kafka/log/LogValidator.scala
##########
@@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging {
 
     val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec)
 
-    for (batch <- records.batches.asScala) {
+    records.batches.forEach { batch =>
       validateBatch(topicPartition, firstBatch, batch, origin, magic, brokerTopicStats)
 
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
 
       val recordErrors = new ArrayBuffer[ApiRecordError](0)
-      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+      var batchIndex = 0

Review comment:
       copy that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org