You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/21 21:53:41 UTC
[kafka] branch 2.3 updated: MINOR: A few logging improvements in
the broker (#6773)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 09f6b0e MINOR: A few logging improvements in the broker (#6773)
09f6b0e is described below
commit 09f6b0e632dbbdd8eafeb866c393cf530a9af85b
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 21 14:50:24 2019 -0700
MINOR: A few logging improvements in the broker (#6773)
Reviewers: Boyang Chen <bc...@outlook.com>, Rajini Sivaram <ra...@googlemail.com>
---
core/src/main/scala/kafka/log/Log.scala | 7 -------
core/src/main/scala/kafka/server/AbstractFetcherManager.scala | 3 ++-
core/src/main/scala/kafka/server/ReplicaManager.scala | 11 ++++++++---
.../main/scala/kafka/server/epoch/LeaderEpochFileCache.scala | 5 ++++-
4 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ef786be..56b2969 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -230,13 +230,6 @@ class Log(@volatile var dir: File,
}
def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
- if ((updatedKeys.contains(LogConfig.RetentionMsProp)
- || updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))
- && topicPartition.partition == 0 // generate warnings only for one partition of each topic
- && newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)
- warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is set to ${newConfig.retentionMs}. It is smaller than " +
- s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${newConfig.messageTimestampDifferenceMaxMs}. " +
- s"This may result in frequent log rolling.")
val oldConfig = this.config
this.config = newConfig
if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index a5faf0e..53152eb 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -174,7 +174,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
fetcher.removePartitions(partitions)
failedPartitions.removeAll(partitions)
}
- info(s"Removed fetcher for partitions $partitions")
+ if (partitions.nonEmpty)
+ info(s"Removed fetcher for partitions $partitions")
}
def shutdownIdleFetcherThreads() {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cccccfe..2023a97 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1084,14 +1084,19 @@ class ReplicaManager(val config: KafkaConfig,
s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
- } else {
- // Otherwise record the error code in response
+ } else if (requestLeaderEpoch < currentLeaderEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
- s"leader epoch $requestLeaderEpoch is not higher than the current " +
+ s"leader epoch $requestLeaderEpoch is smaller than the current " +
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+ } else {
+ stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
+ s"controller $controllerId with correlation id $correlationId " +
+ s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+ s"leader epoch $requestLeaderEpoch matches the current leader epoch")
+ responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 0c885b7..7219ee1 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -78,7 +78,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
if (removedEpochs.isEmpty) {
debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
- } else {
+ } else if (removedEpochs.size > 1 || removedEpochs.head.startOffset != entryToAppend.startOffset) {
+ // Only log a warning if there were non-trivial removals. If the start offset of the new entry
+ // matches the start offfset of the removed epoch, then no data has been written and the truncation
+ // is expected.
warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
s"Cache now contains ${epochs.size} entries.")
}