You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/05/24 16:35:55 UTC
[kafka] branch trunk updated: MINOR: Improve broker registration
and Log logging (#8714)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5302efb MINOR: Improve broker registration and Log logging (#8714)
5302efb is described below
commit 5302efb2d1b7a69bcd3173a13b2d08a2666979ed
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun May 24 09:35:24 2020 -0700
MINOR: Improve broker registration and Log logging (#8714)
Broker registration previously:
> INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArraySeq(EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(localhost,9093,ListenerName(SSL),SSL)), czxid (broker epoch):
4294967320 (kafka.zk.KafkaZkClient)
Now:
> INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:9092,SSL://localhost:9093, czxid (broker epoch):
4294967320 (kafka.zk.KafkaZkClient)
The second improvement is to avoid logging messages like:
> "Deleting segments List()"
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
core/src/main/scala/kafka/log/Log.scala | 27 +++++++++++++-----------
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 ++-
2 files changed, 17 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d046368..4860726 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1694,9 +1694,10 @@ class Log(@volatile private var _dir: File,
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
- if (deletable.nonEmpty)
+ if (deletable.nonEmpty) {
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
- deleteSegments(deletable)
+ deleteSegments(deletable)
+ } else 0
}
}
@@ -2195,15 +2196,17 @@ class Log(@volatile private var _dir: File,
* @param asyncDelete Whether the segment files should be deleted asynchronously
*/
private def removeAndDeleteSegments(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = {
- lock synchronized {
- // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
- // removing the deleted segment, we should force materialization of the iterator here, so that results of the
- // iteration remain valid and deterministic.
- val toDelete = segments.toList
- toDelete.foreach { segment =>
- this.segments.remove(segment.baseOffset)
+ if (segments.nonEmpty) {
+ lock synchronized {
+ // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
+ // removing the deleted segment, we should force materialization of the iterator here, so that results of the
+ // iteration remain valid and deterministic.
+ val toDelete = segments.toList
+ toDelete.foreach { segment =>
+ this.segments.remove(segment.baseOffset)
+ }
+ deleteSegmentFiles(toDelete, asyncDelete)
}
- deleteSegmentFiles(toDelete, asyncDelete)
}
}
@@ -2221,14 +2224,14 @@ class Log(@volatile private var _dir: File,
segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
def deleteSegments(): Unit = {
- info(s"Deleting segments $segments")
+ info(s"Deleting segments ${segments.mkString(",")}")
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
segments.foreach(_.deleteIfExists())
}
}
if (asyncDelete) {
- info(s"Scheduling segments for deletion $segments")
+ info(s"Scheduling segments for deletion ${segments.mkString(",")}")
scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
} else {
deleteSegments()
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index b10ddbe..7f6f025 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -93,7 +93,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
def registerBroker(brokerInfo: BrokerInfo): Long = {
val path = brokerInfo.path
val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
- info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}, czxid (broker epoch): ${stat.getCzxid}")
+ info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +
+ s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")
stat.getCzxid
}