You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/11 15:43:52 UTC

[GitHub] [kafka] chia7712 opened a new pull request #9162: MINOR: refactor Log#read to avoid using "return" keyword in nested an…

chia7712 opened a new pull request #9162:
URL: https://github.com/apache/kafka/pull/9162


   scala throws and then catches ```NonLocalReturnException``` to implement the control flow of returning from a nested anonymous function. That is anti-pattern and we should avoid using it in the hot methods.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9162: MINOR: refactor Log#read to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-672294311


   Ah, is `maybeHandleIOException` the issue?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#discussion_r511620727



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1099,171 +1099,169 @@ class Log(@volatile private var _dir: File,
       val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize)
 
       // return if we have no valid messages or if this is a duplicate of the last appended entry
-      if (appendInfo.shallowCount == 0)
-        return appendInfo
+      if (appendInfo.shallowCount == 0) appendInfo
+      else {
 
-      // trim any invalid bytes or partial messages before appending it to the on-disk log
-      var validRecords = trimInvalidBytes(records, appendInfo)
+        // trim any invalid bytes or partial messages before appending it to the on-disk log
+        var validRecords = trimInvalidBytes(records, appendInfo)
 
-      // they are valid, insert them in the log
-      lock synchronized {
-        checkIfMemoryMappedBufferClosed()
-        if (assignOffsets) {
-          // assign offsets to the message set
-          val offset = new LongRef(nextOffsetMetadata.messageOffset)
-          appendInfo.firstOffset = Some(offset.value)
-          val now = time.milliseconds
-          val validateAndOffsetAssignResult = try {
-            LogValidator.validateMessagesAndAssignOffsets(validRecords,
-              topicPartition,
-              offset,
-              time,
-              now,
-              appendInfo.sourceCodec,
-              appendInfo.targetCodec,
-              config.compact,
-              config.messageFormatVersion.recordVersion.value,
-              config.messageTimestampType,
-              config.messageTimestampDifferenceMaxMs,
-              leaderEpoch,
-              origin,
-              interBrokerProtocolVersion,
-              brokerTopicStats)
-          } catch {
-            case e: IOException =>
-              throw new KafkaException(s"Error validating messages while appending to log $name", e)
-          }
-          validRecords = validateAndOffsetAssignResult.validatedRecords
-          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
-          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
-          appendInfo.lastOffset = offset.value - 1
-          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
-          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
-            appendInfo.logAppendTime = now
-
-          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
-          // format conversion)
-          if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
-            for (batch <- validRecords.batches.asScala) {
-              if (batch.sizeInBytes > config.maxMessageSize) {
-                // we record the original message set size instead of the trimmed size
-                // to be consistent with pre-compression bytesRejectedRate recording
-                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
-                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
-                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
-              }
+        // they are valid, insert them in the log
+        lock synchronized {
+          checkIfMemoryMappedBufferClosed()
+          if (assignOffsets) {
+            // assign offsets to the message set
+            val offset = new LongRef(nextOffsetMetadata.messageOffset)
+            appendInfo.firstOffset = Some(offset.value)
+            val now = time.milliseconds
+            val validateAndOffsetAssignResult = try {
+              LogValidator.validateMessagesAndAssignOffsets(validRecords,
+                topicPartition,
+                offset,
+                time,
+                now,
+                appendInfo.sourceCodec,
+                appendInfo.targetCodec,
+                config.compact,
+                config.messageFormatVersion.recordVersion.value,
+                config.messageTimestampType,
+                config.messageTimestampDifferenceMaxMs,
+                leaderEpoch,
+                origin,
+                interBrokerProtocolVersion,
+                brokerTopicStats)
+            } catch {
+              case e: IOException =>
+                throw new KafkaException(s"Error validating messages while appending to log $name", e)
             }
-          }
-        } else {
-          // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic)
-            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
-                                                 records.records.asScala.map(_.offset))
-
-          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
-            // we may still be able to recover if the log is empty
-            // one example: fetching from log start offset on the leader which is not batch aligned,
-            // which may happen as a result of AdminClient#deleteRecords()
-            val firstOffset = appendInfo.firstOffset match {
-              case Some(offset) => offset
-              case None => records.batches.asScala.head.baseOffset()
+            validRecords = validateAndOffsetAssignResult.validatedRecords
+            appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
+            appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
+            appendInfo.lastOffset = offset.value - 1
+            appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
+            if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
+              appendInfo.logAppendTime = now
+
+            // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
+            // format conversion)
+            if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
+              for (batch <- validRecords.batches.asScala) {
+                if (batch.sizeInBytes > config.maxMessageSize) {
+                  // we record the original message set size instead of the trimmed size
+                  // to be consistent with pre-compression bytesRejectedRate recording
+                  brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+                  brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
+                  throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
+                    s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
+                }
+              }
             }
+          } else {
+            // we are taking the offsets we are given
+            if (!appendInfo.offsetsMonotonic)
+              throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
+                records.records.asScala.map(_.offset))
+
+            if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
+              // we may still be able to recover if the log is empty
+              // one example: fetching from log start offset on the leader which is not batch aligned,
+              // which may happen as a result of AdminClient#deleteRecords()
+              val firstOffset = appendInfo.firstOffset match {
+                case Some(offset) => offset
+                case None => records.batches.asScala.head.baseOffset()
+              }
 
-            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
-            throw new UnexpectedAppendOffsetException(
-              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
-              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
-              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
-              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
-              firstOffset, appendInfo.lastOffset)
+              val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
+              throw new UnexpectedAppendOffsetException(
+                s"Unexpected offset in append to $topicPartition. $firstOrLast " +
+                  s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
+                  s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
+                  s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
+                firstOffset, appendInfo.lastOffset)
+            }
           }
-        }
 
-        // update the epoch cache with the epoch stamped onto the message by the leader
-        validRecords.batches.forEach { batch =>
-          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
-          } else {
-            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
-            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
-            // to truncation by high watermark after the next leader election.
-            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
-              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
-              cache.clearAndFlush()
+          // update the epoch cache with the epoch stamped onto the message by the leader
+          validRecords.batches.forEach { batch =>
+            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+              maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
+            } else {
+              // In partial upgrade scenarios, we may get a temporary regression to the message format. In
+              // order to ensure the safety of leader election, we clear the epoch cache so that we revert
+              // to truncation by high watermark after the next leader election.
+              leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
+                warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
+                cache.clearAndFlush()
+              }
             }
           }
-        }
 
-        // check messages set size may be exceed config.segmentSize
-        if (validRecords.sizeInBytes > config.segmentSize) {
-          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
-            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
-        }
+          // check messages set size may be exceed config.segmentSize
+          if (validRecords.sizeInBytes > config.segmentSize) {
+            throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
+              s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
+          }
 
-        // maybe roll the log if this segment is full
-        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
-
-        val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
-          segmentBaseOffset = segment.baseOffset,
-          relativePositionInSegment = segment.size)
-
-        // now that we have valid records, offsets assigned, and timestamps updated, we need to
-        // validate the idempotent/transactional state of the producers and collect some metadata
-        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
-          logOffsetMetadata, validRecords, origin)
-
-        maybeDuplicate.foreach { duplicate =>
-          appendInfo.firstOffset = Some(duplicate.firstOffset)
-          appendInfo.lastOffset = duplicate.lastOffset
-          appendInfo.logAppendTime = duplicate.timestamp
-          appendInfo.logStartOffset = logStartOffset
-          return appendInfo
-        }
+          // maybe roll the log if this segment is full
+          val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
 
-        segment.append(largestOffset = appendInfo.lastOffset,
-          largestTimestamp = appendInfo.maxTimestamp,
-          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
-          records = validRecords)
-
-        // Increment the log end offset. We do this immediately after the append because a
-        // write to the transaction index below may fail and we want to ensure that the offsets
-        // of future appends still grow monotonically. The resulting transaction index inconsistency
-        // will be cleaned up after the log directory is recovered. Note that the end offset of the
-        // ProducerStateManager will not be updated and the last stable offset will not advance
-        // if the append to the transaction index fails.
-        updateLogEndOffset(appendInfo.lastOffset + 1)
-
-        // update the producer state
-        for (producerAppendInfo <- updatedProducers.values) {
-          producerStateManager.update(producerAppendInfo)
-        }
+          val logOffsetMetadata = LogOffsetMetadata(
+            messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
+            segmentBaseOffset = segment.baseOffset,
+            relativePositionInSegment = segment.size)
 
-        // update the transaction index with the true last stable offset. The last offset visible
-        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
-        for (completedTxn <- completedTxns) {
-          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
-          segment.updateTxnIndex(completedTxn, lastStableOffset)
-          producerStateManager.completeTxn(completedTxn)
-        }
+          // now that we have valid records, offsets assigned, and timestamps updated, we need to
+          // validate the idempotent/transactional state of the producers and collect some metadata
+          val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
+            logOffsetMetadata, validRecords, origin)
 
-        // always update the last producer id map offset so that the snapshot reflects the current offset
-        // even if there isn't any idempotent data being written
-        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
+          if (maybeDuplicate.isDefined) {

Review comment:
       Seems like we should use a pattern match here instead of `isDefined` and `get`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9162: MINOR: refactor Log#read to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#discussion_r468999490



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1501,43 +1501,36 @@ class Log(@volatile private var _dir: File,
         case FetchTxnCommitted => fetchLastStableOffsetMetadata
       }
 
-      if (startOffset == maxOffsetMetadata.messageOffset) {
-        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-      } else if (startOffset > maxOffsetMetadata.messageOffset) {
-        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
-        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
-      }
-
-      // Do the read on the segment with a base offset less than the target offset
-      // but if that segment doesn't contain any messages with an offset greater than that
-      // continue to read from successive segments until we get some messages or we reach the end of the log
-      while (segmentEntry != null) {
-        val segment = segmentEntry.getValue
-
-        val maxPosition = {
-          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
-          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
-            maxOffsetMetadata.relativePositionInSegment
-          } else {
-            segment.size
-          }
-        }
-
-        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
-        if (fetchInfo == null) {
-          segmentEntry = segments.higherEntry(segmentEntry.getKey)
-        } else {
-          return if (includeAbortedTxns)
-            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
-          else
-            fetchInfo
-        }
+      if (startOffset == maxOffsetMetadata.messageOffset)
+        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
+      else if (startOffset > maxOffsetMetadata.messageOffset)
+        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
+      else {
+        // Do the read on the segment with a base offset less than the target offset
+        // but if that segment doesn't contain any messages with an offset greater than that
+        // continue to read from successive segments until we get some messages or we reach the end of the log
+        var currentEntry = segmentFloorEntry
+        Iterator.continually(currentEntry)

Review comment:
       your are right!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 edited a comment on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 edited a comment on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-717022055


   ```
   org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions
   
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
   ```
   
   unrelated error.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-675344443


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-703261725


   ```
   Build / JDK 8 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```
   unrelated error. rebase


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9162: MINOR: refactor Log#read and Log#fetchOffsetByTimestamp to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-672907940


   Yeah, I would fix all the cases in this file at once if possible.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-679805254


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9162: MINOR: refactor Log#read to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#discussion_r468882551



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1501,43 +1501,36 @@ class Log(@volatile private var _dir: File,
         case FetchTxnCommitted => fetchLastStableOffsetMetadata
       }
 
-      if (startOffset == maxOffsetMetadata.messageOffset) {
-        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-      } else if (startOffset > maxOffsetMetadata.messageOffset) {
-        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
-        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
-      }
-
-      // Do the read on the segment with a base offset less than the target offset
-      // but if that segment doesn't contain any messages with an offset greater than that
-      // continue to read from successive segments until we get some messages or we reach the end of the log
-      while (segmentEntry != null) {
-        val segment = segmentEntry.getValue
-
-        val maxPosition = {
-          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
-          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
-            maxOffsetMetadata.relativePositionInSegment
-          } else {
-            segment.size
-          }
-        }
-
-        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
-        if (fetchInfo == null) {
-          segmentEntry = segments.higherEntry(segmentEntry.getKey)
-        } else {
-          return if (includeAbortedTxns)
-            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
-          else
-            fetchInfo
-        }
+      if (startOffset == maxOffsetMetadata.messageOffset)
+        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
+      else if (startOffset > maxOffsetMetadata.messageOffset)
+        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
+      else {
+        // Do the read on the segment with a base offset less than the target offset
+        // but if that segment doesn't contain any messages with an offset greater than that
+        // continue to read from successive segments until we get some messages or we reach the end of the log
+        var currentEntry = segmentFloorEntry
+        Iterator.continually(currentEntry)

Review comment:
       Since this code is pretty critical, I'd keep the previous while loop and add a boolean to indicate a break or something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 edited a comment on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 edited a comment on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-678590561


   ```testMetadataRetries``` is traced by #9091
   
   ```DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota``` pass on my local


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log#read and Log#fetchOffsetByTimestamp to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-672891234


   > what are the other cases where we do this in Log?
   
   The use case I mentioned is a heavy IO-load task of my (team) application. I didn't trace other potential hot methods for remaining use cases because it seems to me the IO-heavy is the most important case we have to care.
   
   For another, ```Log``` has a bunch of anonymous functions which are using "return" to control flow. Maybe we should fix them at once even if they are not critical methods :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-673942147


   only ```shouldUpgradeFromEosAlphaToEosBeta``` fails. retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-696523232


   ```
   Build / JDK 15 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   Build / JDK 8 / kafka.api.ConsumerBounceTest.testClose
   ```
   
   unrelated failure. rebase to trigger QA


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-678590561


   ```testMetadataRetries``` is traced by #9091


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-672957838


   > Yeah, I would fix all the cases in this file at once if possible.
   
   sure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-690884676


   ```
   Build / JDK 11 / kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   ```
   pass on my local
   
   ```
   Build / JDK 15 / org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries
   ```
   it is already fixed by https://github.com/apache/kafka/commit/e4eab377e1489fc0cc90edec3eeb382b1192a442


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 merged pull request #9162:
URL: https://github.com/apache/kafka/pull/9162


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#discussion_r511728437



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1535,42 +1533,48 @@ class Log(@volatile private var _dir: File,
       }
 
       if (startOffset == maxOffsetMetadata.messageOffset) {
-        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
+        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
       } else if (startOffset > maxOffsetMetadata.messageOffset) {
         val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
-        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
-      }
+        emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
+      } else {
 
-      // Do the read on the segment with a base offset less than the target offset
-      // but if that segment doesn't contain any messages with an offset greater than that
-      // continue to read from successive segments until we get some messages or we reach the end of the log
-      while (segmentEntry != null) {
-        val segment = segmentEntry.getValue
+        // Do the read on the segment with a base offset less than the target offset
+        // but if that segment doesn't contain any messages with an offset greater than that
+        // continue to read from successive segments until we get some messages or we reach the end of the log
+        var done = segmentEntry == null
+        var fetchDataInfo: FetchDataInfo = null
+        while (!done) {
+          val segment = segmentEntry.getValue
+
+          val maxPosition = {
+            // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
+            if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
+              maxOffsetMetadata.relativePositionInSegment
+            } else {
+              segment.size
+            }
+          }
 
-        val maxPosition = {
-          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
-          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
-            maxOffsetMetadata.relativePositionInSegment
+          val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)

Review comment:
       good style. Will copy that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-696523232


   ```
   Build / JDK 15 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   Build / JDK 8 / kafka.api.ConsumerBounceTest.testClose
   ```
   
   unrelated failure. rebase to trigger QA


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-690884676






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-717023388


   add a trivial change to trigger QA again


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log#read and Log#fetchOffsetByTimestamp to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-672737657


   I profiled one of our use case to trace the ```NonLocalReturnException```. ```Log#read``` and ```Log#fetchOffsetByTimestamp``` produces most of ```NonLocalReturnException``` so I only refactor that two methods to keep this patch small.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-717116228


   ```
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
   ```
   it is traced by https://issues.apache.org/jira/browse/KAFKA-10017


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9162: MINOR: refactor Log#read and Log#fetchOffsetByTimestamp to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-672882880


   @chia7712 what are the other cases where we do this in `Log`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-689963179


   @ijuma I have attached test result and ```NonLocalReturnException``` is gone. Please take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9162: MINOR: refactor Log#read to avoid using "return" keyword in nested an…

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-672294145


   Are we using `return` inside an anonymous function here?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-697223860


   ```
   Build / JDK 8 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```
   It is flaky on my local so it should be unrelated to this patch. @ijuma Could you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#discussion_r511617718



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1535,42 +1533,48 @@ class Log(@volatile private var _dir: File,
       }
 
       if (startOffset == maxOffsetMetadata.messageOffset) {
-        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
+        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
       } else if (startOffset > maxOffsetMetadata.messageOffset) {
         val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
-        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
-      }
+        emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
+      } else {
 
-      // Do the read on the segment with a base offset less than the target offset
-      // but if that segment doesn't contain any messages with an offset greater than that
-      // continue to read from successive segments until we get some messages or we reach the end of the log
-      while (segmentEntry != null) {
-        val segment = segmentEntry.getValue
+        // Do the read on the segment with a base offset less than the target offset
+        // but if that segment doesn't contain any messages with an offset greater than that
+        // continue to read from successive segments until we get some messages or we reach the end of the log
+        var done = segmentEntry == null
+        var fetchDataInfo: FetchDataInfo = null
+        while (!done) {
+          val segment = segmentEntry.getValue
+
+          val maxPosition = {
+            // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
+            if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
+              maxOffsetMetadata.relativePositionInSegment
+            } else {
+              segment.size
+            }

Review comment:
       Nit: we can remove the braces from the if/else.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1535,42 +1533,48 @@ class Log(@volatile private var _dir: File,
       }
 
       if (startOffset == maxOffsetMetadata.messageOffset) {
-        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
+        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
       } else if (startOffset > maxOffsetMetadata.messageOffset) {
         val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
-        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
-      }
+        emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
+      } else {
 
-      // Do the read on the segment with a base offset less than the target offset
-      // but if that segment doesn't contain any messages with an offset greater than that
-      // continue to read from successive segments until we get some messages or we reach the end of the log
-      while (segmentEntry != null) {
-        val segment = segmentEntry.getValue
+        // Do the read on the segment with a base offset less than the target offset
+        // but if that segment doesn't contain any messages with an offset greater than that
+        // continue to read from successive segments until we get some messages or we reach the end of the log
+        var done = segmentEntry == null
+        var fetchDataInfo: FetchDataInfo = null
+        while (!done) {
+          val segment = segmentEntry.getValue
+
+          val maxPosition = {
+            // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
+            if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
+              maxOffsetMetadata.relativePositionInSegment
+            } else {
+              segment.size
+            }
+          }
 
-        val maxPosition = {
-          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
-          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
-            maxOffsetMetadata.relativePositionInSegment
+          val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)

Review comment:
       Maybe something like the following may be clearer:
   
   ```scala
   fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
   if (fetchDataInfo != null) {
     if (includeAbortedTxns)
       fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo)
   } else segmentEntry = segments.higherEntry(segmentEntry.getKey)
   
   done = fetchDataInfo != null || segmentEntry == null
   ```
   
   What do you think?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1535,42 +1533,48 @@ class Log(@volatile private var _dir: File,
       }
 
       if (startOffset == maxOffsetMetadata.messageOffset) {
-        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
+        emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
       } else if (startOffset > maxOffsetMetadata.messageOffset) {
         val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
-        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
-      }
+        emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
+      } else {
 
-      // Do the read on the segment with a base offset less than the target offset
-      // but if that segment doesn't contain any messages with an offset greater than that
-      // continue to read from successive segments until we get some messages or we reach the end of the log
-      while (segmentEntry != null) {
-        val segment = segmentEntry.getValue
+        // Do the read on the segment with a base offset less than the target offset
+        // but if that segment doesn't contain any messages with an offset greater than that
+        // continue to read from successive segments until we get some messages or we reach the end of the log
+        var done = segmentEntry == null
+        var fetchDataInfo: FetchDataInfo = null
+        while (!done) {
+          val segment = segmentEntry.getValue
+
+          val maxPosition = {
+            // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
+            if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
+              maxOffsetMetadata.relativePositionInSegment
+            } else {
+              segment.size
+            }
+          }
 
-        val maxPosition = {
-          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
-          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
-            maxOffsetMetadata.relativePositionInSegment
+          val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
+          if (fetchInfo == null) {
+            segmentEntry = segments.higherEntry(segmentEntry.getKey)
+            done = segmentEntry == null
           } else {
-            segment.size
+            fetchDataInfo = if (includeAbortedTxns) addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
+            else fetchInfo

Review comment:
       The formatting here is a bit weird (the else is aligned the same as the variable. I think it's easier to understand if the if/else are aligned and indented.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-690884676






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-673212958


   ```org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true]``` failed. I test it on my local and it pass. retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-716126171


   All green :)
   
   @ijuma @hachikuji Could you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-717022055






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org