You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/05/24 16:35:55 UTC

[kafka] branch trunk updated: MINOR: Improve broker registration and Log logging (#8714)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5302efb  MINOR: Improve broker registration and Log logging (#8714)
5302efb is described below

commit 5302efb2d1b7a69bcd3173a13b2d08a2666979ed
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun May 24 09:35:24 2020 -0700

    MINOR: Improve broker registration and Log logging (#8714)
    
    Broker registration previously:
    
    > INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArraySeq(EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(localhost,9093,ListenerName(SSL),SSL)), czxid (broker epoch):
    4294967320 (kafka.zk.KafkaZkClient)
    
    Now:
    
    > INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:9092,SSL://localhost:9093, czxid (broker epoch):
    4294967320 (kafka.zk.KafkaZkClient)
    
    The second improvement is to avoid logging messages like:
    
    > "Deleting segments List()"
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala          | 27 +++++++++++++-----------
 core/src/main/scala/kafka/zk/KafkaZkClient.scala |  3 ++-
 2 files changed, 17 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d046368..4860726 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1694,9 +1694,10 @@ class Log(@volatile private var _dir: File,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
     lock synchronized {
       val deletable = deletableSegments(predicate)
-      if (deletable.nonEmpty)
+      if (deletable.nonEmpty) {
         info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
-      deleteSegments(deletable)
+        deleteSegments(deletable)
+      } else 0
     }
   }
 
@@ -2195,15 +2196,17 @@ class Log(@volatile private var _dir: File,
    * @param asyncDelete Whether the segment files should be deleted asynchronously
    */
   private def removeAndDeleteSegments(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = {
-    lock synchronized {
-      // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
-      // removing the deleted segment, we should force materialization of the iterator here, so that results of the
-      // iteration remain valid and deterministic.
-      val toDelete = segments.toList
-      toDelete.foreach { segment =>
-        this.segments.remove(segment.baseOffset)
+    if (segments.nonEmpty) {
+      lock synchronized {
+        // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
+        // removing the deleted segment, we should force materialization of the iterator here, so that results of the
+        // iteration remain valid and deterministic.
+        val toDelete = segments.toList
+        toDelete.foreach { segment =>
+          this.segments.remove(segment.baseOffset)
+        }
+        deleteSegmentFiles(toDelete, asyncDelete)
       }
-      deleteSegmentFiles(toDelete, asyncDelete)
     }
   }
 
@@ -2221,14 +2224,14 @@ class Log(@volatile private var _dir: File,
     segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
 
     def deleteSegments(): Unit = {
-      info(s"Deleting segments $segments")
+      info(s"Deleting segments ${segments.mkString(",")}")
       maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
         segments.foreach(_.deleteIfExists())
       }
     }
 
     if (asyncDelete) {
-      info(s"Scheduling segments for deletion $segments")
+      info(s"Scheduling segments for deletion ${segments.mkString(",")}")
       scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
     } else {
       deleteSegments()
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index b10ddbe..7f6f025 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -93,7 +93,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   def registerBroker(brokerInfo: BrokerInfo): Long = {
     val path = brokerInfo.path
     val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
-    info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}, czxid (broker epoch): ${stat.getCzxid}")
+    info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +
+      s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")
     stat.getCzxid
   }