You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/21 21:53:41 UTC

[kafka] branch 2.3 updated: MINOR: A few logging improvements in the broker (#6773)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 09f6b0e  MINOR: A few logging improvements in the broker (#6773)
09f6b0e is described below

commit 09f6b0e632dbbdd8eafeb866c393cf530a9af85b
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 21 14:50:24 2019 -0700

    MINOR: A few logging improvements in the broker (#6773)
    
    Reviewers: Boyang Chen <bc...@outlook.com>, Rajini Sivaram <ra...@googlemail.com>
---
 core/src/main/scala/kafka/log/Log.scala                       |  7 -------
 core/src/main/scala/kafka/server/AbstractFetcherManager.scala |  3 ++-
 core/src/main/scala/kafka/server/ReplicaManager.scala         | 11 ++++++++---
 .../main/scala/kafka/server/epoch/LeaderEpochFileCache.scala  |  5 ++++-
 4 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ef786be..56b2969 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -230,13 +230,6 @@ class Log(@volatile var dir: File,
   }
 
   def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
-    if ((updatedKeys.contains(LogConfig.RetentionMsProp)
-      || updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))
-      && topicPartition.partition == 0  // generate warnings only for one partition of each topic
-      && newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)
-      warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is set to ${newConfig.retentionMs}. It is smaller than " +
-        s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${newConfig.messageTimestampDifferenceMaxMs}. " +
-        s"This may result in frequent log rolling.")
     val oldConfig = this.config
     this.config = newConfig
     if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index a5faf0e..53152eb 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -174,7 +174,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
         fetcher.removePartitions(partitions)
       failedPartitions.removeAll(partitions)
     }
-    info(s"Removed fetcher for partitions $partitions")
+    if (partitions.nonEmpty)
+      info(s"Removed fetcher for partitions $partitions")
   }
 
   def shutdownIdleFetcherThreads() {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cccccfe..2023a97 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1084,14 +1084,19 @@ class ReplicaManager(val config: KafkaConfig,
                 s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
               responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
             }
-          } else {
-            // Otherwise record the error code in response
+          } else if (requestLeaderEpoch < currentLeaderEpoch) {
             stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
               s"controller $controllerId with correlation id $correlationId " +
               s"epoch $controllerEpoch for partition $topicPartition since its associated " +
-              s"leader epoch $requestLeaderEpoch is not higher than the current " +
+              s"leader epoch $requestLeaderEpoch is smaller than the current " +
               s"leader epoch $currentLeaderEpoch")
             responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+          } else {
+            stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
+              s"controller $controllerId with correlation id $correlationId " +
+              s"epoch $controllerEpoch for partition $topicPartition since its associated " +
+              s"leader epoch $requestLeaderEpoch matches the current leader epoch")
+            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
           }
         }
 
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 0c885b7..7219ee1 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -78,7 +78,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
 
     if (removedEpochs.isEmpty) {
       debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
-    } else {
+    } else if (removedEpochs.size > 1 || removedEpochs.head.startOffset != entryToAppend.startOffset) {
+      // Only log a warning if there were non-trivial removals. If the start offset of the new entry
+      // matches the start offfset of the removed epoch, then no data has been written and the truncation
+      // is expected.
       warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
         s"Cache now contains ${epochs.size} entries.")
     }