You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/06/04 16:41:48 UTC
[kafka] branch trunk updated: MINOR: LogLoader: Add log identifier
at few missing areas (#10819)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5ef9022 MINOR: LogLoader: Add log identifier at few missing areas (#10819)
5ef9022 is described below
commit 5ef90226f8936e06bfc06665ac81cdb2a7f386c4
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Fri Jun 4 09:40:36 2021 -0700
MINOR: LogLoader: Add log identifier at few missing areas (#10819)
Reviewers: Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/LogLoader.scala | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index ef9ba00..0a9222e 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -373,7 +373,7 @@ object LogLoader extends Logging {
params.config,
time = params.time,
fileSuffix = Log.SwapFileSuffix)
- info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
+ info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
recoverSegment(swapSegment, params)
// We create swap files for two cases:
@@ -425,8 +425,9 @@ object LogLoader extends Logging {
if (logEndOffset >= params.logStartOffsetCheckpoint)
Some(logEndOffset)
else {
- warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ${params.logStartOffsetCheckpoint}. " +
- "This could happen if segment files were deleted from the file system.")
+ warn(s"${params.logIdentifier}Deleting all segments because logEndOffset ($logEndOffset) " +
+ s" smaller than logStartOffset ${params.logStartOffsetCheckpoint}." +
+ " This could happen if segment files were deleted from the file system.")
removeAndDeleteSegmentsAsync(params.segments.values, params)
params.leaderEpochCache.foreach(_.clearAndFlush())
params.producerStateManager.truncateFullyAndStartAt(params.logStartOffsetCheckpoint)
@@ -514,7 +515,7 @@ object LogLoader extends Logging {
// materialization of the iterator here, so that results of the iteration remain valid and
// deterministic.
val toDelete = segmentsToDelete.toList
- info(s"Deleting segments as part of log recovery: ${toDelete.mkString(",")}")
+ info(s"${params.logIdentifier}Deleting segments as part of log recovery: ${toDelete.mkString(",")}")
toDelete.foreach { segment =>
params.segments.remove(segment.baseOffset)
}