You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jl...@apache.org on 2024/02/19 15:54:57 UTC

(kafka) branch trunk updated: MINOR: Clean up core modules (#15279)

This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b71999be953 MINOR: Clean up core modules (#15279)
b71999be953 is described below

commit b71999be95325f6ea54e925cbe5b426425781014
Author: Josep Prat <jo...@aiven.io>
AuthorDate: Mon Feb 19 16:54:50 2024 +0100

    MINOR: Clean up core modules (#15279)
    
    This PR cleans up: metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper package classes
    
    Mark methods and fields private where possible
    Annotate public methods and fields
    Remove unused classes and methods
    Make sure Arrays are not printed with .toString
    Optimize minor warnings
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 core/src/main/scala/kafka/Kafka.scala              |  2 +-
 .../scala/kafka/metrics/KafkaMetricsReporter.scala |  2 +-
 .../kafka/metrics/LinuxIoMetricsCollector.scala    | 19 +++--
 .../kafka/migration/MigrationPropagator.scala      | 10 +--
 .../main/scala/kafka/network/RequestChannel.scala  | 94 +++++++++++-----------
 .../main/scala/kafka/network/SocketServer.scala    | 44 +++++-----
 .../kafka/raft/TimingWheelExpirationService.scala  |  2 +-
 .../scala/kafka/security/CredentialProvider.scala  | 11 ---
 .../kafka/security/authorizer/AclAuthorizer.scala  | 30 +++----
 .../scala/kafka/security/authorizer/AclEntry.scala | 16 ++--
 core/src/main/scala/kafka/serializer/Decoder.scala |  2 +-
 .../main/scala/kafka/tools/ConsoleProducer.scala   | 89 ++++++++++----------
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 53 ++++++------
 core/src/main/scala/kafka/tools/StorageTool.scala  | 36 ++++-----
 .../main/scala/kafka/tools/TestRaftServer.scala    | 26 +++---
 core/src/main/scala/kafka/utils/CoreUtils.scala    |  2 +-
 core/src/main/scala/kafka/utils/Logging.scala      |  2 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   | 11 +--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 46 +++++------
 core/src/main/scala/kafka/zk/ZkData.scala          | 26 +++---
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |  7 +-
 .../zk/migration/ZkConfigMigrationClient.scala     |  6 +-
 .../ZkDelegationTokenMigrationClient.scala         |  2 +-
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    |  8 +-
 24 files changed, 259 insertions(+), 287 deletions(-)

diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index efa62860514..6b5389fbe32 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -68,7 +68,7 @@ object Kafka extends Logging {
     config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled
 
   private def buildServer(props: Properties): Server = {
-    val config = KafkaConfig.fromProps(props, false)
+    val config = KafkaConfig.fromProps(props, doLog = false)
     if (config.requiresZookeeper) {
       new KafkaServer(
         config,
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 6a50dde69d5..136bb88b289 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -52,7 +52,7 @@ trait KafkaMetricsReporter {
 }
 
 object KafkaMetricsReporter {
-  val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
+  private val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
   private var reporters: ArrayBuffer[KafkaMetricsReporter] = _
 
   def startReporters(verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = {
diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
index 11602affb1f..7c56cfca55b 100644
--- a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
+++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
@@ -17,7 +17,7 @@
 
 package kafka.metrics
 
-import java.nio.file.{Files, Paths}
+import java.nio.file.{Files, Path, Paths}
 import org.apache.kafka.common.utils.Time
 import org.slf4j.Logger
 
@@ -29,10 +29,10 @@ import scala.jdk.CollectionConverters._
  */
 class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) {
   import LinuxIoMetricsCollector._
-  var lastUpdateMs = -1L
-  var cachedReadBytes = 0L
-  var cachedWriteBytes = 0L
-  val path = Paths.get(procRoot, "self", "io")
+  var lastUpdateMs: Long = -1L
+  var cachedReadBytes:Long = 0L
+  var cachedWriteBytes:Long = 0L
+  val path: Path = Paths.get(procRoot, "self", "io")
 
   def readBytes(): Long = this.synchronized {
     val curMs = time.milliseconds()
@@ -64,7 +64,7 @@ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logg
    * write_bytes: 0
    * cancelled_write_bytes: 0
    */
-  def updateValues(now: Long): Boolean = this.synchronized {
+  private def updateValues(now: Long): Boolean = this.synchronized {
     try {
       cachedReadBytes = -1
       cachedWriteBytes = -1
@@ -79,10 +79,9 @@ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logg
       lastUpdateMs = now
       true
     } catch {
-      case t: Throwable => {
+      case t: Throwable =>
         logger.warn("Unable to update IO metrics", t)
         false
-      }
     }
   }
 
@@ -97,6 +96,6 @@ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logg
 }
 
 object LinuxIoMetricsCollector {
-  val READ_BYTES_PREFIX = "read_bytes: "
-  val WRITE_BYTES_PREFIX = "write_bytes: "
+  private val READ_BYTES_PREFIX = "read_bytes: "
+  private val WRITE_BYTES_PREFIX = "write_bytes: "
 }
diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index 7bf0fc3ff56..678bc7a7192 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -66,7 +66,7 @@ class MigrationPropagator(
     stateChangeLogger
   )
 
-  val requestBatch = new MigrationPropagatorBatch(
+  private val requestBatch = new MigrationPropagatorBatch(
     config,
     metadataProvider,
     () => _image.features().metadataVersion(),
@@ -103,11 +103,11 @@ class MigrationPropagator(
    * A very expensive function that creates a map with an entry for every partition that exists, from
    * (topic name, partition index) to partition registration.
    */
-  def materializePartitions(topicsImage: TopicsImage): util.Map[TopicPartition, PartitionRegistration] = {
+  private def materializePartitions(topicsImage: TopicsImage): util.Map[TopicPartition, PartitionRegistration] = {
     val result = new util.HashMap[TopicPartition, PartitionRegistration]()
-    topicsImage.topicsById().values().forEach(topic => {
-      topic.partitions().forEach((key, value) => result.put(new TopicPartition(topic.name(), key), value));
-    })
+    topicsImage.topicsById().values().forEach(topic =>
+      topic.partitions().forEach((key, value) => result.put(new TopicPartition(topic.name(), key), value))
+    )
     result
   }
 
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 007049814cb..679101e7d60 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import java.util.concurrent._
 import com.fasterxml.jackson.databind.JsonNode
 import com.typesafe.scalalogging.Logger
-import com.yammer.metrics.core.Meter
+import com.yammer.metrics.core.{Histogram, Meter}
 import kafka.network
 import kafka.server.{KafkaConfig, RequestLocal}
 import kafka.utils.{Logging, NotNothing, Pool}
@@ -46,11 +46,11 @@ import scala.reflect.ClassTag
 object RequestChannel extends Logging {
   private val requestLogger = Logger("kafka.request.logger")
 
-  val RequestQueueSizeMetric = "RequestQueueSize"
-  val ResponseQueueSizeMetric = "ResponseQueueSize"
+  private val RequestQueueSizeMetric = "RequestQueueSize"
+  private val ResponseQueueSizeMetric = "ResponseQueueSize"
   val ProcessorMetricTag = "processor"
 
-  def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
+  private def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
 
   sealed trait BaseRequest
   case object ShutdownRequest extends BaseRequest
@@ -87,18 +87,18 @@ object RequestChannel extends Logging {
                 val envelope: Option[RequestChannel.Request] = None) extends BaseRequest {
     // These need to be volatile because the readers are in the network thread and the writers are in the request
     // handler threads or the purgatory threads
-    @volatile var requestDequeueTimeNanos = -1L
-    @volatile var apiLocalCompleteTimeNanos = -1L
-    @volatile var responseCompleteTimeNanos = -1L
-    @volatile var responseDequeueTimeNanos = -1L
-    @volatile var messageConversionsTimeNanos = 0L
-    @volatile var apiThrottleTimeMs = 0L
-    @volatile var temporaryMemoryBytes = 0L
+    @volatile var requestDequeueTimeNanos: Long = -1L
+    @volatile var apiLocalCompleteTimeNanos: Long = -1L
+    @volatile var responseCompleteTimeNanos: Long = -1L
+    @volatile var responseDequeueTimeNanos: Long = -1L
+    @volatile var messageConversionsTimeNanos: Long = 0L
+    @volatile var apiThrottleTimeMs: Long = 0L
+    @volatile var temporaryMemoryBytes: Long = 0L
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
     @volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
     @volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
 
-    val session = new Session(context.principal, context.clientAddress)
+    val session: Session = new Session(context.principal, context.clientAddress)
 
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
@@ -110,7 +110,7 @@ object RequestChannel extends Logging {
 
     def header: RequestHeader = context.header
 
-    def sizeOfBodyInBytes: Int = bodyAndSize.size
+    private def sizeOfBodyInBytes: Int = bodyAndSize.size
 
     def sizeInBytes: Int = header.size + sizeOfBodyInBytes
 
@@ -293,7 +293,7 @@ object RequestChannel extends Logging {
       }
     }
 
-    override def toString = s"Request(processor=$processor, " +
+    override def toString: String = s"Request(processor=$processor, " +
       s"connectionId=${context.connectionId}, " +
       s"session=$session, " +
       s"listenerName=${context.listenerName}, " +
@@ -356,8 +356,8 @@ class RequestChannel(val queueSize: Int,
 
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
-  val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
-  val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
+  private val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
+  private val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
   private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
 
   metricsGroup.newGauge(requestQueueSizeMetricName, () => requestQueue.size)
@@ -511,24 +511,24 @@ class RequestChannel(val queueSize: Int,
 }
 
 object RequestMetrics {
-  val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
-  val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
-
-  val verifyPartitionsInTxnMetricName = ApiKeys.ADD_PARTITIONS_TO_TXN.name + "Verification"
-
-  val RequestsPerSec = "RequestsPerSec"
-  val DeprecatedRequestsPerSec = "DeprecatedRequestsPerSec"
-  val RequestQueueTimeMs = "RequestQueueTimeMs"
-  val LocalTimeMs = "LocalTimeMs"
-  val RemoteTimeMs = "RemoteTimeMs"
-  val ThrottleTimeMs = "ThrottleTimeMs"
-  val ResponseQueueTimeMs = "ResponseQueueTimeMs"
-  val ResponseSendTimeMs = "ResponseSendTimeMs"
-  val TotalTimeMs = "TotalTimeMs"
-  val RequestBytes = "RequestBytes"
-  val MessageConversionsTimeMs = "MessageConversionsTimeMs"
-  val TemporaryMemoryBytes = "TemporaryMemoryBytes"
-  val ErrorsPerSec = "ErrorsPerSec"
+  val consumerFetchMetricName: String = ApiKeys.FETCH.name + "Consumer"
+  val followFetchMetricName: String = ApiKeys.FETCH.name + "Follower"
+
+  val verifyPartitionsInTxnMetricName: String = ApiKeys.ADD_PARTITIONS_TO_TXN.name + "Verification"
+
+  val RequestsPerSec: String = "RequestsPerSec"
+  val DeprecatedRequestsPerSec: String = "DeprecatedRequestsPerSec"
+  private val RequestQueueTimeMs = "RequestQueueTimeMs"
+  private val LocalTimeMs = "LocalTimeMs"
+  private val RemoteTimeMs = "RemoteTimeMs"
+  private val ThrottleTimeMs = "ThrottleTimeMs"
+  private val ResponseQueueTimeMs = "ResponseQueueTimeMs"
+  private val ResponseSendTimeMs = "ResponseSendTimeMs"
+  private val TotalTimeMs = "TotalTimeMs"
+  private val RequestBytes = "RequestBytes"
+  val MessageConversionsTimeMs: String = "MessageConversionsTimeMs"
+  val TemporaryMemoryBytes: String = "TemporaryMemoryBytes"
+  val ErrorsPerSec: String = "ErrorsPerSec"
 }
 
 private case class DeprecatedRequestRateKey(version: Short, clientInformation: ClientInformation)
@@ -539,34 +539,34 @@ class RequestMetrics(name: String) {
 
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
-  val tags = Map("request" -> name).asJava
-  val requestRateInternal = new Pool[Short, Meter]()
+  val tags: util.Map[String, String] = Map("request" -> name).asJava
+  private val requestRateInternal = new Pool[Short, Meter]()
   private val deprecatedRequestRateInternal = new Pool[DeprecatedRequestRateKey, Meter]()
   // time a request spent in a request queue
-  val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs, true, tags)
+  val requestQueueTimeHist: Histogram = metricsGroup.newHistogram(RequestQueueTimeMs, true, tags)
   // time a request takes to be processed at the local broker
-  val localTimeHist = metricsGroup.newHistogram(LocalTimeMs, true, tags)
+  val localTimeHist: Histogram = metricsGroup.newHistogram(LocalTimeMs, true, tags)
   // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
-  val remoteTimeHist = metricsGroup.newHistogram(RemoteTimeMs, true, tags)
+  val remoteTimeHist: Histogram = metricsGroup.newHistogram(RemoteTimeMs, true, tags)
   // time a request is throttled, not part of the request processing time (throttling is done at the client level
   // for clients that support KIP-219 and by muting the channel for the rest)
-  val throttleTimeHist = metricsGroup.newHistogram(ThrottleTimeMs, true, tags)
+  val throttleTimeHist: Histogram = metricsGroup.newHistogram(ThrottleTimeMs, true, tags)
   // time a response spent in a response queue
-  val responseQueueTimeHist = metricsGroup.newHistogram(ResponseQueueTimeMs, true, tags)
+  val responseQueueTimeHist: Histogram = metricsGroup.newHistogram(ResponseQueueTimeMs, true, tags)
   // time to send the response to the requester
-  val responseSendTimeHist = metricsGroup.newHistogram(ResponseSendTimeMs, true, tags)
-  val totalTimeHist = metricsGroup.newHistogram(TotalTimeMs, true, tags)
+  val responseSendTimeHist: Histogram = metricsGroup.newHistogram(ResponseSendTimeMs, true, tags)
+  val totalTimeHist: Histogram = metricsGroup.newHistogram(TotalTimeMs, true, tags)
   // request size in bytes
-  val requestBytesHist = metricsGroup.newHistogram(RequestBytes, true, tags)
+  val requestBytesHist: Histogram = metricsGroup.newHistogram(RequestBytes, true, tags)
   // time for message conversions (only relevant to fetch and produce requests)
-  val messageConversionsTimeHist =
+  val messageConversionsTimeHist: Option[Histogram] =
     if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
       Some(metricsGroup.newHistogram(MessageConversionsTimeMs, true, tags))
     else
       None
   // Temporary memory allocated for processing request (only populated for fetch and produce requests)
   // This shows the memory allocated for compression/conversions excluding the actual request size
-  val tempMemoryBytesHist =
+  val tempMemoryBytesHist: Option[Histogram] =
     if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
       Some(metricsGroup.newHistogram(TemporaryMemoryBytes, true, tags))
     else
@@ -600,7 +600,7 @@ class RequestMetrics(name: String) {
     extendedTags
   }
 
-  class ErrorMeter(name: String, error: Errors) {
+  private class ErrorMeter(name: String, error: Errors) {
     private val tags = Map("request" -> name, "error" -> error.name).asJava
 
     @volatile private var meter: Meter = _
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 50da12f5da6..e90af2e18ca 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -85,7 +85,7 @@ class SocketServer(val config: KafkaConfig,
 
   private val maxQueuedRequests = config.queuedMaxRequests
 
-  protected val nodeId = config.brokerId
+  protected val nodeId: Int = config.brokerId
 
   private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ")
 
@@ -239,7 +239,7 @@ class SocketServer(val config: KafkaConfig,
     enableFuture
   }
 
-  def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
+  private def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
     if (stopped) {
       throw new RuntimeException("Can't create new data plane acceptor and processors: SocketServer is stopped.")
     }
@@ -404,13 +404,13 @@ class SocketServer(val config: KafkaConfig,
 object SocketServer {
   val MetricsGroup = "socket-server-metrics"
 
-  val ReconfigurableConfigs = Set(
+  val ReconfigurableConfigs: Set[String] = Set(
     KafkaConfig.MaxConnectionsPerIpProp,
     KafkaConfig.MaxConnectionsPerIpOverridesProp,
     KafkaConfig.MaxConnectionsProp,
     KafkaConfig.MaxConnectionCreationRateProp)
 
-  val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp)
+  val ListenerReconfigurableConfigs: Set[String] = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp)
 
   def closeSocket(
     channel: SocketChannel,
@@ -422,9 +422,9 @@ object SocketServer {
 }
 
 object DataPlaneAcceptor {
-  val ThreadPrefix = "data-plane"
-  val MetricPrefix = ""
-  val ListenerReconfigurableConfigs = Set(KafkaConfig.NumNetworkThreadsProp)
+  val ThreadPrefix: String = "data-plane"
+  val MetricPrefix: String = ""
+  val ListenerReconfigurableConfigs: Set[String] = Set(KafkaConfig.NumNetworkThreadsProp)
 }
 
 class DataPlaneAcceptor(socketServer: SocketServer,
@@ -481,7 +481,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
    */
   override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
     configs.forEach { (k, v) =>
-      if (reconfigurableConfigs.contains(k)) {
+      if (reconfigurableConfigs().contains(k)) {
         val newValue = v.asInstanceOf[Int]
         val oldValue = processors.length
         if (newValue != oldValue) {
@@ -501,8 +501,8 @@ class DataPlaneAcceptor(socketServer: SocketServer,
    * Reconfigures this instance with the given key-value pairs. The provided
    * map contains all configs including any reconfigurable configs that
    * may have changed since the object was initially configured using
-   * {@link Configurable# configure ( Map )}. This method will only be invoked if
-   * the configs have passed validation using {@link #validateReconfiguration ( Map )}.
+   * [[org.apache.kafka.common.Configurable#configure( Map )]]. This method will only be invoked if
+   * the configs have passed validation using [[validateReconfiguration( Map )]].
    */
   override def reconfigure(configs: util.Map[String, _]): Unit = {
     val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int]
@@ -559,12 +559,6 @@ class ControlPlaneAcceptor(socketServer: SocketServer,
   override def metricPrefix(): String = ControlPlaneAcceptor.MetricPrefix
   override def threadPrefix(): String = ControlPlaneAcceptor.ThreadPrefix
 
-  def processorOpt(): Option[Processor] = {
-    if (processors.isEmpty)
-      None
-    else
-      Some(processors.apply(0))
-  }
 }
 
 /**
@@ -607,8 +601,8 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
     endPoint.port
   } else {
     serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
-    val newPort = serverChannel.socket().getLocalPort()
-    info(s"Opened wildcard endpoint ${endPoint.host}:${newPort}")
+    val newPort = serverChannel.socket().getLocalPort
+    info(s"Opened wildcard endpoint ${endPoint.host}:$newPort")
     newPort
   }
 
@@ -625,7 +619,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
   private var started = false
   private[network] val startedFuture = new CompletableFuture[Void]()
 
-  val thread = KafkaThread.nonDaemon(
+  val thread: KafkaThread = KafkaThread.nonDaemon(
     s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
     this)
 
@@ -859,7 +853,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
   }
 
   def newProcessor(id: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol): Processor = {
-    val name = s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${id}"
+    val name = s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
     new Processor(id,
                   time,
                   config.socketRequestMaxBytes,
@@ -882,7 +876,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
 }
 
 private[kafka] object Processor {
-  val IdlePercentMetricName = "IdlePercent"
+  private val IdlePercentMetricName = "IdlePercent"
   val NetworkProcessorMetricTag = "networkProcessor"
   val ListenerMetricTag = "listener"
   val ConnectionQueueSize = 20
@@ -920,9 +914,9 @@ private[kafka] class Processor(
 ) extends Runnable with Logging {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
-  val shouldRun = new AtomicBoolean(true)
+  val shouldRun: AtomicBoolean = new AtomicBoolean(true)
 
-  val thread = KafkaThread.nonDaemon(threadName, this)
+  val thread: KafkaThread = KafkaThread.nonDaemon(threadName, this)
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -957,7 +951,7 @@ private[kafka] class Processor(
     Map(NetworkProcessorMetricTag -> id.toString).asJava
   )
 
-  val expiredConnectionsKilledCount = new CumulativeSum()
+  private val expiredConnectionsKilledCount = new CumulativeSum()
   private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", MetricsGroup, metricTags)
   metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
 
@@ -1114,7 +1108,7 @@ private[kafka] class Processor(
     }
   }
 
-  protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
+  private def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
     val header = RequestHeader.parse(buffer)
     if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
       header
diff --git a/core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala b/core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala
index 449574fad4a..3c330fb6f2e 100644
--- a/core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala
+++ b/core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.server.util.timer.{Timer, TimerTask}
 object TimingWheelExpirationService {
   private val WorkTimeoutMs: Long = 200L
 
-  class TimerTaskCompletableFuture[T](delayMs: Long) extends TimerTask(delayMs) {
+  private class TimerTaskCompletableFuture[T](delayMs: Long) extends TimerTask(delayMs) {
     val future = new CompletableFuture[T]
     override def run(): Unit = {
       future.completeExceptionally(new TimeoutException(
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 85951a526a6..6c58ff0a5dd 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -21,8 +21,6 @@ import java.util.{Collection, Properties}
 import org.apache.kafka.clients.admin.{ScramMechanism => AdminScramMechanism}
 import org.apache.kafka.common.security.authenticator.CredentialCache
 import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.config.ConfigDef
-import org.apache.kafka.common.config.ConfigDef._
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramMechanism}
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 
@@ -62,12 +60,3 @@ class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: De
     }
   }
 }
-
-object CredentialProvider {
-  def userCredentialConfigs: ConfigDef = {
-    ScramMechanism.values.foldLeft(new ConfigDef) {
-      (c, m) => c.define(m.mechanismName, Type.STRING, null, Importance.MEDIUM, s"User credentials for SCRAM mechanism ${m.mechanismName}")
-    }
-  }
-}
-
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 34070d16378..99642b33c2b 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -48,23 +48,23 @@ import scala.util.{Failure, Random, Success, Try}
 object AclAuthorizer {
   // Optional override zookeeper cluster configuration where acls will be stored. If not specified,
   // acls will be stored in the same zookeeper where all other kafka broker metadata is stored.
-  val configPrefix = "authorizer."
-  val ZkUrlProp = s"${configPrefix}zookeeper.url"
-  val ZkConnectionTimeOutProp = s"${configPrefix}zookeeper.connection.timeout.ms"
-  val ZkSessionTimeOutProp = s"${configPrefix}zookeeper.session.timeout.ms"
-  val ZkMaxInFlightRequests = s"${configPrefix}zookeeper.max.in.flight.requests"
+  val configPrefix: String = "authorizer."
+  private val ZkUrlProp = s"${configPrefix}zookeeper.url"
+  private val ZkConnectionTimeOutProp = s"${configPrefix}zookeeper.connection.timeout.ms"
+  private val ZkSessionTimeOutProp = s"${configPrefix}zookeeper.session.timeout.ms"
+  private val ZkMaxInFlightRequests = s"${configPrefix}zookeeper.max.in.flight.requests"
 
   // Semi-colon separated list of users that will be treated as super users and will have access to all the resources
   // for all actions from all hosts, defaults to no super users.
-  val SuperUsersProp = "super.users"
+  val SuperUsersProp: String = "super.users"
   // If set to true when no acls are found for a resource, authorizer allows access to everyone. Defaults to false.
-  val AllowEveryoneIfNoAclIsFoundProp = "allow.everyone.if.no.acl.found"
+  val AllowEveryoneIfNoAclIsFoundProp: String = "allow.everyone.if.no.acl.found"
 
   case class VersionedAcls(acls: Set[AclEntry], zkVersion: Int) {
     def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
   }
 
-  class AclSeqs(seqs: Seq[AclEntry]*) {
+  private class AclSeqs(seqs: Seq[AclEntry]*) {
     def find(p: AclEntry => Boolean): Option[AclEntry] = {
       // Lazily iterate through the inner `Seq` elements and stop as soon as we find a match
       val it = seqs.iterator.flatMap(_.find(p))
@@ -75,8 +75,8 @@ object AclAuthorizer {
     def isEmpty: Boolean = !seqs.exists(_.nonEmpty)
   }
 
-  val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
-  val WildcardHost = "*"
+  val NoAcls: VersionedAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
+  val WildcardHost: String = "*"
 
   // Orders by resource type, then resource pattern type and finally reverse ordering by name.
   class ResourceOrdering extends Ordering[ResourcePattern] {
@@ -103,7 +103,7 @@ object AclAuthorizer {
     else {
       // start with the base config from the Kafka configuration
       // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
-      val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
+      val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true)
       // add in any prefixed overlays
       KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) =>
         configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
@@ -148,7 +148,7 @@ object AclAuthorizer {
     }
   }
 
-  def getAclsFromZk(zkClient: KafkaZkClient, resource: ResourcePattern): VersionedAcls = {
+  private def getAclsFromZk(zkClient: KafkaZkClient, resource: ResourcePattern): VersionedAcls = {
     zkClient.getVersionedAclsForResource(resource)
   }
 }
@@ -525,7 +525,7 @@ class AclAuthorizer extends Authorizer with Logging {
     if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
   }
 
-  def isSuperUser(principal: KafkaPrincipal): Boolean = {
+  private def isSuperUser(principal: KafkaPrincipal): Boolean = {
     if (superUsers.contains(principal)) {
       authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.")
       true
@@ -596,7 +596,7 @@ class AclAuthorizer extends Authorizer with Logging {
     }
   }
 
-  def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = {
+  private def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = {
     def logMessage: String = {
       val principal = requestContext.principal
       val operation = SecurityUtils.operationName(action.operation)
@@ -753,7 +753,7 @@ class AclAuthorizer extends Authorizer with Logging {
     }
   }
 
-  object AclChangedNotificationHandler extends AclChangeNotificationHandler {
+  private object AclChangedNotificationHandler extends AclChangeNotificationHandler {
     override def processNotification(resource: ResourcePattern): Unit = {
       processAclChangeNotification(resource)
     }
diff --git a/core/src/main/scala/kafka/security/authorizer/AclEntry.scala b/core/src/main/scala/kafka/security/authorizer/AclEntry.scala
index 9e2d49fc883..be54746c991 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclEntry.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclEntry.scala
@@ -33,19 +33,19 @@ object AclEntry {
   val WildcardHost: String = "*"
   val WildcardResource: String = ResourcePattern.WILDCARD_RESOURCE
 
-  val ResourceSeparator = ":"
+  val ResourceSeparator: String = ":"
   val ResourceTypes: Set[ResourceType] = ResourceType.values.toSet
     .filterNot(t => t == ResourceType.UNKNOWN || t == ResourceType.ANY)
   val AclOperations: Set[AclOperation] = AclOperation.values.toSet
     .filterNot(t => t == AclOperation.UNKNOWN || t == AclOperation.ANY)
 
-  val PrincipalKey = "principal"
-  val PermissionTypeKey = "permissionType"
-  val OperationKey = "operation"
-  val HostsKey = "host"
-  val VersionKey = "version"
-  val CurrentVersion = 1
-  val AclsKey = "acls"
+  private val PrincipalKey = "principal"
+  private val PermissionTypeKey = "permissionType"
+  private val OperationKey = "operation"
+  private val HostsKey = "host"
+  val VersionKey: String = "version"
+  val CurrentVersion: Int = 1
+  private val AclsKey = "acls"
 
   def apply(principal: KafkaPrincipal,
             permissionType: AclPermissionType,
diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala
index 1ad55410081..ce166689cfa 100644
--- a/core/src/main/scala/kafka/serializer/Decoder.scala
+++ b/core/src/main/scala/kafka/serializer/Decoder.scala
@@ -43,7 +43,7 @@ class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[B
  * an optional property serializer.encoding to control this.
  */
 class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] {
-  val encoding =
+  val encoding: String =
     if (props == null)
       StandardCharsets.UTF_8.name()
     else
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 37a8c807c80..0f12a49b00a 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.nio.charset.StandardCharsets
 import java.util.Properties
 import java.util.regex.Pattern
-import joptsimple.{OptionException, OptionParser, OptionSet}
+import joptsimple.{OptionException, OptionParser, OptionSet, OptionSpec}
 import kafka.common.MessageReader
 import kafka.utils.Implicits._
 import kafka.utils.{Exit, Logging, ToolsUtils}
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.tools.api.RecordReader
 
+import java.lang
 import scala.annotation.nowarn
 
 @nowarn("cat=deprecation")
@@ -104,7 +105,7 @@ object ConsoleProducer extends Logging {
         System.err.println(e.getMessage)
         Exit.exit(1)
       case e: Exception =>
-        e.printStackTrace
+        e.printStackTrace()
         Exit.exit(1)
     }
   }
@@ -174,81 +175,81 @@ object ConsoleProducer extends Logging {
   }
 
   class ProducerConfig(args: Array[String]) extends CommandDefaultOptions(args) {
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+    val topicOpt: OptionSpec[String] = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
       .withRequiredArg
       .describedAs("topic")
       .ofType(classOf[String])
-    val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+    private val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
       .withRequiredArg
       .describedAs("broker-list")
       .ofType(classOf[String])
-    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+    val bootstrapServerOpt: OptionSpec[String]  = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
       .requiredUnless("broker-list")
       .withRequiredArg
       .describedAs("server to connect to")
       .ofType(classOf[String])
-    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
-    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+    private val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+    val compressionCodecOpt: OptionSpec[String]  = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
                                                                   "If specified without value, then it defaults to 'gzip'")
                                     .withOptionalArg()
                                     .describedAs("compression-codec")
                                     .ofType(classOf[String])
-    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. "+
+    val batchSizeOpt: OptionSpec[Integer] = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. "+
        "please note that this option will be replaced if max-partition-memory-bytes is also set")
       .withRequiredArg
       .describedAs("size")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(16 * 1024)
-    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
+    val messageSendMaxRetriesOpt: OptionSpec[Integer] = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, " +
       "and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. " +
       "This is the option to control `retries` in producer configs.")
       .withRequiredArg
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(3)
-    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
+    val retryBackoffMsOpt: OptionSpec[lang.Long] = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. " +
       "Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. " +
       "This is the option to control `retry.backoff.ms` in producer configs.")
       .withRequiredArg
       .ofType(classOf[java.lang.Long])
       .defaultsTo(100)
-    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+    val sendTimeoutOpt: OptionSpec[lang.Long] = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
       " a message will queue awaiting sufficient batch size. The value is given in ms. " +
       "This is the option to control `linger.ms` in producer configs.")
       .withRequiredArg
       .describedAs("timeout_ms")
       .ofType(classOf[java.lang.Long])
       .defaultsTo(1000)
-    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
+    val requestRequiredAcksOpt: OptionSpec[String] = parser.accepts("request-required-acks", "The required `acks` of the producer requests")
       .withRequiredArg
       .describedAs("request required acks")
       .ofType(classOf[java.lang.String])
       .defaultsTo("-1")
-    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
+    val requestTimeoutMsOpt: OptionSpec[Integer] = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero.")
       .withRequiredArg
       .describedAs("request timeout ms")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1500)
-    val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+    val metadataExpiryMsOpt: OptionSpec[lang.Long] = parser.accepts("metadata-expiry-ms",
       "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes. " +
         "This is the option to control `metadata.max.age.ms` in producer configs.")
       .withRequiredArg
       .describedAs("metadata expiration interval")
       .ofType(classOf[java.lang.Long])
       .defaultsTo(5*60*1000L)
-    val maxBlockMsOpt = parser.accepts("max-block-ms",
+    val maxBlockMsOpt: OptionSpec[lang.Long] = parser.accepts("max-block-ms",
       "The max time that the producer will block for during a send request.")
       .withRequiredArg
       .describedAs("max block on send")
       .ofType(classOf[java.lang.Long])
       .defaultsTo(60*1000L)
-    val maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+    val maxMemoryBytesOpt: OptionSpec[lang.Long] = parser.accepts("max-memory-bytes",
       "The total memory used by the producer to buffer records waiting to be sent to the server. " +
         "This is the option to control `buffer.memory` in producer configs.")
       .withRequiredArg
       .describedAs("total memory in bytes")
       .ofType(classOf[java.lang.Long])
       .defaultsTo(32 * 1024 * 1024L)
-    val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+    val maxPartitionMemoryBytesOpt: OptionSpec[Integer] = parser.accepts("max-partition-memory-bytes",
       "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
         "will attempt to optimistically group them together until this size is reached. " +
         "This is the option to control `batch.size` in producer configs.")
@@ -256,19 +257,19 @@ object ConsoleProducer extends Logging {
       .describedAs("memory in bytes per partition")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(16 * 1024)
-    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+    private val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
       "By default each line is read as a separate message.")
       .withRequiredArg
       .describedAs("reader_class")
       .ofType(classOf[java.lang.String])
       .defaultsTo(classOf[LineMessageReader].getName)
-    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
+    val socketBufferSizeOpt: OptionSpec[Integer] = parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
       "This is the option to control `send.buffer.bytes` in producer configs.")
       .withRequiredArg
       .describedAs("size")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1024*100)
-    val propertyOpt = parser.accepts("property",
+    private val propertyOpt = parser.accepts("property",
       """A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader.
         |Default properties include:
         | parse.key=false
@@ -291,15 +292,15 @@ object ConsoleProducer extends Logging {
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
-    val readerConfigOpt = parser.accepts("reader-config", s"Config properties file for the message reader. Note that $propertyOpt takes precedence over this config.")
+    val readerConfigOpt: OptionSpec[String] = parser.accepts("reader-config", s"Config properties file for the message reader. Note that $propertyOpt takes precedence over this config.")
       .withRequiredArg
       .describedAs("config file")
       .ofType(classOf[String])
-    val producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
+    private val producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
             .withRequiredArg
             .describedAs("producer_prop")
             .ofType(classOf[String])
-    val producerConfigOpt = parser.accepts("producer.config", s"Producer config properties file. Note that $producerPropertyOpt takes precedence over this config.")
+    val producerConfigOpt: OptionSpec[String]  = parser.accepts("producer.config", s"Producer config properties file. Note that $producerPropertyOpt takes precedence over this config.")
       .withRequiredArg
       .describedAs("config file")
       .ofType(classOf[String])
@@ -310,24 +311,24 @@ object ConsoleProducer extends Logging {
 
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
 
-    val topic = options.valueOf(topicOpt)
+    val topic: String = options.valueOf(topicOpt)
 
-    val bootstrapServer = options.valueOf(bootstrapServerOpt)
-    val brokerList = options.valueOf(brokerListOpt)
+    val bootstrapServer: String = options.valueOf(bootstrapServerOpt)
+    val brokerList: String = options.valueOf(brokerListOpt)
 
-    val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
+    val brokerHostsAndPorts: String = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
     ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts)
 
-    val sync = options.has(syncOpt)
-    val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
-    val compressionCodec = if (options.has(compressionCodecOpt))
+    val sync: Boolean = options.has(syncOpt)
+    private val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
+    val compressionCodec: String = if (options.has(compressionCodecOpt))
                              if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty)
                                CompressionType.GZIP.name
                              else compressionCodecOptionValue
                            else CompressionType.NONE.name
-    val readerClass = options.valueOf(messageReaderOpt)
-    val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
-    val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
+    val readerClass: String = options.valueOf(messageReaderOpt)
+    val cmdLineProps: Properties = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
+    val extraProducerProps: Properties = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
 
     def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
       try
@@ -341,17 +342,17 @@ object ConsoleProducer extends Logging {
 
   class LineMessageReader extends RecordReader {
     var topic: String = _
-    var parseKey = false
-    var keySeparator = "\t"
-    var parseHeaders = false
-    var headersDelimiter = "\t"
-    var headersSeparator = ","
-    var headersKeySeparator = ":"
-    var ignoreError = false
-    var lineNumber = 0
-    var printPrompt = System.console != null
-    var headersSeparatorPattern: Pattern = _
-    var nullMarker: String = _
+    var parseKey: Boolean = false
+    var keySeparator: String = "\t"
+    var parseHeaders: Boolean = false
+    private var headersDelimiter = "\t"
+    var headersSeparator: String = ","
+    private var headersKeySeparator = ":"
+    private var ignoreError = false
+    private var lineNumber = 0
+    private val printPrompt = System.console != null
+    private var headersSeparatorPattern: Pattern = _
+    private var nullMarker: String = _
 
     override def configure(props: java.util.Map[String, _]): Unit = {
       topic = props.get("topic").toString
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8a37e659220..8aa5995612c 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -347,9 +347,9 @@ object DumpLogSegments {
   }
 
   class TimeIndexDumpErrors {
-    val misMatchesForTimeIndexFilesMap = mutable.Map[String, ArrayBuffer[(Long, Long)]]()
-    val outOfOrderTimestamp = mutable.Map[String, ArrayBuffer[(Long, Long)]]()
-    val shallowOffsetNotFound = mutable.Map[String, ArrayBuffer[(Long, Long)]]()
+    val misMatchesForTimeIndexFilesMap: mutable.Map[String, ArrayBuffer[(Long, Long)]] = mutable.Map[String, ArrayBuffer[(Long, Long)]]()
+    val outOfOrderTimestamp: mutable.Map[String, ArrayBuffer[(Long, Long)]] = mutable.Map[String, ArrayBuffer[(Long, Long)]]()
+    val shallowOffsetNotFound: mutable.Map[String, ArrayBuffer[(Long, Long)]] = mutable.Map[String, ArrayBuffer[(Long, Long)]]()
 
     def recordMismatchTimeIndex(file: File, indexTimestamp: Long, logTimestamp: Long): Unit = {
       val misMatchesSeq = misMatchesForTimeIndexFilesMap.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
@@ -374,21 +374,19 @@ object DumpLogSegments {
 
     def printErrors(): Unit = {
       misMatchesForTimeIndexFilesMap.foreach {
-        case (fileName, listOfMismatches) => {
+        case (fileName, listOfMismatches) =>
           System.err.println("Found timestamp mismatch in :" + fileName)
           listOfMismatches.foreach(m => {
             System.err.println("  Index timestamp: %d, log timestamp: %d".format(m._1, m._2))
           })
-        }
       }
 
       outOfOrderTimestamp.foreach {
-        case (fileName, outOfOrderTimestamps) => {
+        case (fileName, outOfOrderTimestamps) =>
           System.err.println("Found out of order timestamp in :" + fileName)
           outOfOrderTimestamps.foreach(m => {
             System.err.println("  Index timestamp: %d, Previously indexed timestamp: %d".format(m._1, m._2))
           })
-        }
       }
 
       shallowOffsetNotFound.values.foreach { listOfShallowOffsetNotFound =>
@@ -413,7 +411,7 @@ object DumpLogSegments {
   }
 
   private class ClusterMetadataLogMessageParser extends MessageParser[String, String] {
-    val metadataRecordSerde = new MetadataRecordSerde()
+    private val metadataRecordSerde = new MetadataRecordSerde()
 
     override def parse(record: Record): (Option[String], Option[String]) = {
       val output = try {
@@ -425,11 +423,10 @@ object DumpLogSegments {
         json.set("version", new IntNode(messageAndVersion.version()))
         json.set("data", MetadataJsonConverters.writeJson(
           messageAndVersion.message(), messageAndVersion.version()))
-        json.toString()
+        json.toString
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           s"Error at ${record.offset}, skipping. ${e.getMessage}"
-        }
       }
       // No keys for metadata records
       (None, Some(output))
@@ -437,39 +434,39 @@ object DumpLogSegments {
   }
 
   private class DumpLogSegmentsOptions(args: Array[String]) extends CommandDefaultOptions(args) {
-    val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified.")
-    val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content.")
-    val indexSanityOpt = parser.accepts("index-sanity-check", "if set, just checks the index sanity without printing its content. " +
+    private val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified.")
+    private val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content.")
+    private val indexSanityOpt = parser.accepts("index-sanity-check", "if set, just checks the index sanity without printing its content. " +
       "This is the same check that is executed on broker startup to determine if an index needs rebuilding or not.")
-    val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped.")
+    private val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped.")
       .withRequiredArg
       .describedAs("file1, file2, ...")
       .ofType(classOf[String])
-    val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.")
+    private val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.")
       .withRequiredArg
       .describedAs("size")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(5 * 1024 * 1024)
-    val maxBytesOpt = parser.accepts("max-bytes", "Limit the amount of total batches read in bytes avoiding reading the whole .log file(s).")
+    private val maxBytesOpt = parser.accepts("max-bytes", "Limit the amount of total batches read in bytes avoiding reading the whole .log file(s).")
        .withRequiredArg
        .describedAs("size")
        .ofType(classOf[java.lang.Integer])
        .defaultsTo(Integer.MAX_VALUE)
-    val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.")
-    val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+    private val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.")
+    private val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
       .withOptionalArg()
       .ofType(classOf[java.lang.String])
       .defaultsTo("kafka.serializer.StringDecoder")
-    val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
+    private val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
       .withOptionalArg()
       .ofType(classOf[java.lang.String])
       .defaultsTo("kafka.serializer.StringDecoder")
-    val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from the " +
+    private val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from the " +
       "__consumer_offsets topic.")
-    val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " +
+    private val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " +
       "transaction metadata from the __transaction_state topic.")
-    val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "if set, log data will be parsed as cluster metadata records.")
-    val skipRecordMetadataOpt = parser.accepts("skip-record-metadata", "whether to skip printing metadata for each record.")
+    private val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "if set, log data will be parsed as cluster metadata records.")
+    private val skipRecordMetadataOpt = parser.accepts("skip-record-metadata", "whether to skip printing metadata for each record.")
     options = parser.parse(args : _*)
 
     def messageParser: MessageParser[_, _] =
@@ -492,13 +489,13 @@ object DumpLogSegments {
       options.has(valueDecoderOpt) ||
       options.has(keyDecoderOpt)
 
-    lazy val skipRecordMetadata = options.has(skipRecordMetadataOpt)
+    lazy val skipRecordMetadata: Boolean = options.has(skipRecordMetadataOpt)
     lazy val isDeepIteration: Boolean = options.has(deepIterationOpt) || shouldPrintDataLog
     lazy val verifyOnly: Boolean = options.has(verifyOpt)
     lazy val indexSanityOnly: Boolean = options.has(indexSanityOpt)
-    lazy val files = options.valueOf(filesOpt).split(",")
-    lazy val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
-    lazy val maxBytes = options.valueOf(maxBytesOpt).intValue()
+    lazy val files: Array[String] = options.valueOf(filesOpt).split(",")
+    lazy val maxMessageSize: Int = options.valueOf(maxMessageSizeOpt).intValue()
+    lazy val maxBytes: Int = options.valueOf(maxBytesOpt).intValue()
 
     def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
 
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 30d836370a9..d328d1df871 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -64,11 +64,11 @@ object StorageTool extends Logging {
           if (!metadataVersion.isKRaftSupported) {
             throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
           }
-          if (!metadataVersion.isProduction()) {
+          if (!metadataVersion.isProduction) {
             if (config.get.unstableMetadataVersionsEnabled) {
-              System.out.println(s"WARNING: using pre-production metadata version ${metadataVersion}.")
+              System.out.println(s"WARNING: using pre-production metadata version $metadataVersion.")
             } else {
-              throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.")
+              throw new TerseFailure(s"Metadata version $metadataVersion is not ready for production use yet.")
             }
           }
           val metaProperties = new MetaProperties.Builder().
@@ -78,8 +78,8 @@ object StorageTool extends Logging {
             build()
           val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
           getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-            if (!metadataVersion.isScramSupported()) {
-              throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.");
+            if (!metadataVersion.isScramSupported) {
+              throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.")
             }
             for (record <- userScramCredentialRecords) {
               metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
@@ -151,7 +151,7 @@ object StorageTool extends Logging {
     directories.toSeq
   }
 
-  def configToSelfManagedMode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
+  private def configToSelfManagedMode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
 
   def getMetadataVersion(
     namespace: Namespace,
@@ -167,7 +167,7 @@ object StorageTool extends Logging {
       .getOrElse(defaultValue)
   }
 
-  def getUserScramCredentialRecord(
+  private def getUserScramCredentialRecord(
     mechanism: String,
     config: String
   ) : UserScramCredentialRecord = {
@@ -242,7 +242,7 @@ object StorageTool extends Logging {
     val saltedPassword = getSaltedPassword(argMap, scramMechanism, salt, iterations)
 
     val myrecord = try {
-      val formatter = new ScramFormatter(scramMechanism);
+      val formatter = new ScramFormatter(scramMechanism)
 
       new UserScramCredentialRecord()
            .setName(name)
@@ -348,8 +348,8 @@ object StorageTool extends Logging {
 
       prevMetadata.foreach { prev =>
         val sortedOutput = new util.TreeMap[String, String]()
-        prev.toProperties().entrySet().forEach(e => sortedOutput.put(e.getKey.toString, e.getValue.toString))
-        stream.println(s"Found metadata: ${sortedOutput}")
+        prev.toProperties.entrySet.forEach(e => sortedOutput.put(e.getKey.toString, e.getValue.toString))
+        stream.println(s"Found metadata: $sortedOutput")
         stream.println("")
       }
 
@@ -375,7 +375,7 @@ object StorageTool extends Logging {
     val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
     metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
                         setName(MetadataVersion.FEATURE_NAME).
-                        setFeatureLevel(metadataVersion.featureLevel()), 0.toShort));
+                        setFeatureLevel(metadataVersion.featureLevel()), 0.toShort))
 
     metadataOptionalArguments.foreach { metadataArguments =>
       for (record <- metadataArguments) metadataRecords.add(record)
@@ -427,21 +427,21 @@ object StorageTool extends Logging {
       throw new TerseFailure("No log directories found in the configuration.")
     }
     val loader = new MetaPropertiesEnsemble.Loader()
-    directories.foreach(loader.addLogDir(_))
+    directories.foreach(loader.addLogDir)
     val metaPropertiesEnsemble = loader.load()
     metaPropertiesEnsemble.verify(metaProperties.clusterId(), metaProperties.nodeId(),
       util.EnumSet.noneOf(classOf[VerificationFlag]))
 
-    System.out.println(s"metaPropertiesEnsemble=${metaPropertiesEnsemble}")
+    System.out.println(s"metaPropertiesEnsemble=$metaPropertiesEnsemble")
     val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
     if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
       val firstLogDir = copier.logDirProps().keySet().iterator().next()
-      throw new TerseFailure(s"Log directory ${firstLogDir} is already formatted. " +
+      throw new TerseFailure(s"Log directory $firstLogDir is already formatted. " +
         "Use --ignore-formatted to ignore this directory and format the others.")
     }
     if (!copier.errorLogDirs().isEmpty) {
       val firstLogDir = copier.errorLogDirs().iterator().next()
-      throw new TerseFailure(s"I/O error trying to read log directory ${firstLogDir}.")
+      throw new TerseFailure(s"I/O error trying to read log directory $firstLogDir.")
     }
     if (metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
       stream.println("All of the log directories are already formatted.")
@@ -450,14 +450,14 @@ object StorageTool extends Logging {
         copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties).
           setDirectoryId(copier.generateValidDirectoryId()).
           build())
-        copier.setPreWriteHandler((logDir, isNew, metaProperties) => {
-          stream.println(s"Formatting ${logDir} with metadata.version ${metadataVersion}.")
+        copier.setPreWriteHandler((logDir, _, _) => {
+          stream.println(s"Formatting $logDir with metadata.version $metadataVersion.")
           Files.createDirectories(Paths.get(logDir))
           val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty())
           bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
         })
         copier.setWriteErrorHandler((logDir, e) => {
-          throw new TerseFailure(s"Error while writing meta.properties file ${logDir}: ${e.getMessage}")
+          throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}")
         })
         copier.writeLogDirChanges()
       })
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index f2cdc26723a..420b935dc29 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
-import joptsimple.OptionException
+import joptsimple.{OptionException, OptionSpec}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.{KafkaRaftManager, RaftManager}
 import kafka.security.CredentialProvider
@@ -68,7 +68,7 @@ class TestRaftServer(
   var credentialProvider: CredentialProvider = _
   var tokenCache: DelegationTokenCache = _
   var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
-  var workloadGenerator: RaftWorkloadGenerator = _
+  private var workloadGenerator: RaftWorkloadGenerator = _
   var raftManager: KafkaRaftManager[Array[Byte]] = _
 
   def startup(): Unit = {
@@ -142,7 +142,7 @@ class TestRaftServer(
     shutdownLatch.await()
   }
 
-  class RaftWorkloadGenerator(
+  private class RaftWorkloadGenerator(
     raftManager: RaftManager[Array[Byte]],
     time: Time,
     recordsPerSec: Int,
@@ -150,12 +150,12 @@ class TestRaftServer(
   ) extends ShutdownableThread("raft-workload-generator")
     with RaftClient.Listener[Array[Byte]] {
 
-    sealed trait RaftEvent
-    case class HandleClaim(epoch: Int) extends RaftEvent
-    case object HandleResign extends RaftEvent
-    case class HandleCommit(reader: BatchReader[Array[Byte]]) extends RaftEvent
-    case class HandleSnapshot(reader: SnapshotReader[Array[Byte]]) extends RaftEvent
-    case object Shutdown extends RaftEvent
+    private sealed trait RaftEvent
+    private case class HandleClaim(epoch: Int) extends RaftEvent
+    private case object HandleResign extends RaftEvent
+    private case class HandleCommit(reader: BatchReader[Array[Byte]]) extends RaftEvent
+    private case class HandleSnapshot(reader: SnapshotReader[Array[Byte]]) extends RaftEvent
+    private case object Shutdown extends RaftEvent
 
     private val eventQueue = new LinkedBlockingDeque[RaftEvent]()
     private val stats = new WriteStats(metrics, time, printIntervalMs = 5000)
@@ -411,20 +411,20 @@ object TestRaftServer extends Logging {
     }
   }
 
-  class TestRaftServerOptions(args: Array[String]) extends CommandDefaultOptions(args) {
-    val configOpt = parser.accepts("config", "Required configured file")
+  private class TestRaftServerOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+    val configOpt: OptionSpec[String] = parser.accepts("config", "Required configured file")
       .withRequiredArg
       .describedAs("filename")
       .ofType(classOf[String])
 
-    val throughputOpt = parser.accepts("throughput",
+    val throughputOpt: OptionSpec[Int] = parser.accepts("throughput",
       "The number of records per second the leader will write to the metadata topic")
       .withRequiredArg
       .describedAs("records/sec")
       .ofType(classOf[Int])
       .defaultsTo(5000)
 
-    val recordSizeOpt = parser.accepts("record-size", "The size of each record")
+    val recordSizeOpt: OptionSpec[Int] = parser.accepts("record-size", "The size of each record")
       .withRequiredArg
       .describedAs("size in bytes")
       .ofType(classOf[Int])
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 6af1f3594e2..0b445ed1a3c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -159,7 +159,7 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, requireDistinctPorts = true)
   }
 
-  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+  private def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
     val distinctPorts = endpoints.map(_.port).distinct
     require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
   }
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index 0221821e747..dd83e903360 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -40,7 +40,7 @@ private object Logging {
 
 trait Logging {
 
-  protected lazy val logger = Logger(LoggerFactory.getLogger(loggerName))
+  protected lazy val logger: Logger = Logger(LoggerFactory.getLogger(loggerName))
 
   protected var logIdent: String = _
 
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index e023503275b..72925a036b1 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -452,7 +452,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
    * @param ip ip for which configs are being validated
    * @param configs properties to validate for the IP
    */
-  def validateIpConfig(ip: String, configs: Properties): Unit = {
+  private def validateIpConfig(ip: String, configs: Properties): Unit = {
     if (!DynamicConfig.Ip.isValidIpEntity(ip))
       throw new AdminOperationException(s"$ip is not a valid IP or resolvable host.")
     DynamicConfig.Ip.validate(configs)
@@ -528,7 +528,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
     * only verifies that the provided config does not contain any static configs.
     * @param configs configs to validate
     */
-  def validateBrokerConfig(configs: Properties): Unit = {
+  private def validateBrokerConfig(configs: Properties): Unit = {
     DynamicConfig.Broker.validate(configs)
   }
 
@@ -561,13 +561,6 @@ class AdminZkClient(zkClient: KafkaZkClient,
     zkClient.getEntityConfigs(rootEntityType, sanitizedEntityName)
   }
 
-  /**
-   * Gets all topic configs
-   * @return The successfully gathered configs of all topics
-   */
-  def getAllTopicConfigs(): Map[String, Properties] =
-    zkClient.getAllTopicsInCluster().map(topic => (topic, fetchEntityConfig(ConfigType.TOPIC, topic))).toMap
-
   /**
    * Gets all the entity configs for a given entityType
    * @param entityType entityType for which configs are being fetched
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index c38f866fb0e..26c9453bef2 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -235,7 +235,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
           case Code.OK =>
             info(s"Successfully registered KRaft controller $kraftControllerId with ZK epoch $newControllerEpoch")
             // First op is always SetData on /controller_epoch
-            val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult]
+            val setDataResult = response.zkOpResults.head.rawOpResult.asInstanceOf[SetDataResult]
             SuccessfulRegistrationResult(newControllerEpoch, setDataResult.getStat.getVersion)
           case Code.BADVERSION =>
             info(s"The ZK controller epoch changed $failureSuffix")
@@ -590,7 +590,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 
   /**
    * Checks the topic existence
-   * @param topicName
+   * @param topicName the name of the topic to check
    * @return true if topic exists else false
    */
   def topicExists(topicName: String): Boolean = {
@@ -646,7 +646,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   def setTopicAssignment(topic: String,
                          topicId: Option[Uuid],
                          assignment: Map[TopicPartition, ReplicaAssignment],
-                         expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) = {
+                         expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion): Unit = {
     val setDataResponse = setTopicAssignmentRaw(topic, topicId, assignment, expectedControllerEpochZkVersion)
     setDataResponse.maybeThrow()
   }
@@ -893,7 +893,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 
   /**
    * Gets all the child nodes at a given zk node path
-   * @param path
+   * @param path the path to check
    * @return list of child node names
    */
   def getChildren(path : String): Seq[String] = {
@@ -958,7 +958,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 
   /**
    * Checks if topic is marked for deletion
-   * @param topic
+   * @param topic the name of the topic to check
    * @return true if topic is marked for deletion, else false
    */
   def isTopicMarkedForDeletion(topic: String): Boolean = {
@@ -1046,7 +1046,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @param reassignment the reassignment to set on the reassignment znode.
    * @throws KeeperException if there is an error while creating the znode.
    */
-  def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]])  = {
+  def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]]): Unit = {
     createRecursive(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment))
   }
 
@@ -1184,7 +1184,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 
   /**
    * Creates preferred replica election znode with partitions undergoing election
-   * @param partitions
+   * @param partitions the set of partitions
    * @throws KeeperException if there is an error while creating the znode
    */
   def createPreferredReplicaElection(partitions: Set[TopicPartition]): Unit = {
@@ -1229,7 +1229,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
 
-  def getControllerRegistration: Option[ZKControllerRegistration] = {
+  private def getControllerRegistration: Option[ZKControllerRegistration] = {
     val getDataRequest = GetDataRequest(ControllerZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
@@ -1692,7 +1692,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
-  def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+  private def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
     val createRequest = CreateRequest(
       MigrationZNode.path,
       MigrationZNode.encode(initialState),
@@ -1771,7 +1771,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     * Pre-create top level paths in ZK if needed.
     */
   def createTopLevelPaths(): Unit = {
-    ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists(_))
+    ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)
   }
 
   /**
@@ -1803,7 +1803,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   def deleteFeatureZNode(): Unit = {
-    deletePath(FeatureZNode.path, ZkVersion.MatchAnyVersion, false)
+    deletePath(FeatureZNode.path, ZkVersion.MatchAnyVersion, recursiveDelete = false)
   }
 
   private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): SetDataResponse = {
@@ -1812,7 +1812,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     retryRequestUntilConnected(setDataRequest)
   }
 
-  private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long) = {
+  private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): Unit = {
     val path = ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)
     createRecursive(path, ConsumerOffset.encode(offset))
   }
@@ -1848,11 +1848,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
-  private[kafka] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
+  private[kafka] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true): Unit = {
 
     def parentPath(path: String): String = {
       val indexOfLastSlash = path.lastIndexOf("/")
-      if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
+      if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path $path")
       path.substring(0, indexOfLastSlash)
     }
 
@@ -1994,10 +1994,10 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
                 case Some(value) =>
                   val failedPayload = MigrationZNode.decode(value, version, -1)
                   throw new RuntimeException(
-                    s"Conditional update on KRaft Migration ZNode failed. Expected zkVersion = ${version}. The failed " +
-                    s"write was: ${failedPayload}. This indicates that another KRaft controller is making writes to ZooKeeper.")
+                    s"Conditional update on KRaft Migration ZNode failed. Expected zkVersion = $version. The failed " +
+                    s"write was: $failedPayload. This indicates that another KRaft controller is making writes to ZooKeeper.")
                 case None =>
-                  throw new RuntimeException(s"Check op on KRaft Migration ZNode failed. Expected zkVersion = ${version}. " +
+                  throw new RuntimeException(s"Check op on KRaft Migration ZNode failed. Expected zkVersion = $version. " +
                     s"This indicates that another KRaft controller is making writes to ZooKeeper.")
               }
             } else if (errorCode == Code.OK) {
@@ -2010,7 +2010,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
             throw new RuntimeException(s"Got migration result for incorrect path $path")
           }
         case _ => throw new RuntimeException(
-          s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw ${migrationResult}")
+          s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw $migrationResult")
       }
     }
 
@@ -2097,7 +2097,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     stat
   }
 
-  private def isZKSessionIdDiffFromCurrentZKSessionId(): Boolean = {
+  private def isZKSessionIdDiffFromCurrentZKSessionId: Boolean = {
     zooKeeperClient.sessionId != currentZooKeeperSessionId
   }
 
@@ -2106,7 +2106,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private[zk] def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = {
-    isZKSessionTheEphemeralOwner(ephemeralOwnerId) && isZKSessionIdDiffFromCurrentZKSessionId()
+    isZKSessionTheEphemeralOwner(ephemeralOwnerId) && isZKSessionIdDiffFromCurrentZKSessionId
   }
 
   private def updateCurrentZKSessionId(newSessionId: Long): Unit = {
@@ -2125,7 +2125,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
           val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult]
           setDataResult.getStat
         case Code.NODEEXISTS =>
-          getAfterNodeExists()
+          getAfterNodeExists
         case code =>
           error(s"Error while creating ephemeral at $path with return code: $code")
           throw KeeperException.create(code)
@@ -2166,7 +2166,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
       }
     }
 
-    private def getAfterNodeExists(): Stat = {
+    private def getAfterNodeExists: Stat = {
       val getDataRequest = GetDataRequest(path)
       val getDataResponse = retryRequestUntilConnected(getDataRequest)
       val ephemeralOwnerId = getDataResponse.stat.getEphemeralOwner
@@ -2245,7 +2245,7 @@ object KafkaZkClient {
      * changed in 3.6.0.
      */
     if (zkClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER) == null)
-      zkClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, ((4096 * 1024).toString))
+      zkClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, (4096 * 1024).toString)
 
     if (createChrootIfNecessary) {
       val chrootIndex = connectString.indexOf("/")
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 40a74114ff3..3e830f97f39 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -185,7 +185,7 @@ object BrokerIdZNode {
       broker.rack, broker.features)
   }
 
-  def featuresAsJavaMap(brokerInfo: JsonObject): util.Map[String, util.Map[String, java.lang.Short]] = {
+  private def featuresAsJavaMap(brokerInfo: JsonObject): util.Map[String, util.Map[String, java.lang.Short]] = {
     FeatureZNode.asJavaMap(brokerInfo
       .get(FeaturesKey)
       .flatMap(_.to[Option[Map[String, Map[String, Int]]]])
@@ -461,7 +461,7 @@ object IsrChangeNotificationSequenceZNode {
       }
     }
   }.map(_.toSet).getOrElse(Set.empty)
-  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+  def sequenceNumber(path: String): String = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
 }
 
 object LogDirEventNotificationZNode {
@@ -470,15 +470,15 @@ object LogDirEventNotificationZNode {
 
 object LogDirEventNotificationSequenceZNode {
   val SequenceNumberPrefix = "log_dir_event_"
-  val LogDirFailureEvent = 1
+  private val LogDirFailureEvent = 1
   def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
-  def encode(brokerId: Int) = {
+  def encode(brokerId: Int): Array[Byte] = {
     Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent).asJava)
   }
   def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
     js.asJsonObject("broker").to[Int]
   }
-  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+  def sequenceNumber(path: String): String = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
 }
 
 object AdminZNode {
@@ -561,14 +561,14 @@ object ConsumerPathZNode {
 }
 
 object ConsumerOffset {
-  def path(group: String, topic: String, partition: Integer) = s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}"
+  def path(group: String, topic: String, partition: Integer) = s"${ConsumerPathZNode.path}/$group/offsets/$topic/$partition"
   def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
   def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new String(_, UTF_8).toLong)
 }
 
 object ZkVersion {
-  val MatchAnyVersion = -1 // if used in a conditional set, matches any version (the value should match ZooKeeper codebase)
-  val UnknownVersion = -2  // Version returned from get if node does not exist (internal constant for Kafka codebase, unused value in ZK)
+  val MatchAnyVersion: Int = -1 // if used in a conditional set, matches any version (the value should match ZooKeeper codebase)
+  val UnknownVersion: Int = -2  // Version returned from get if node does not exist (internal constant for Kafka codebase, unused value in ZK)
 }
 
 object ZkStat {
@@ -794,7 +794,7 @@ object ClusterIdZNode {
 
   def fromJson(clusterIdJson:  Array[Byte]): String = {
     Json.parseBytes(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse {
-      throw new KafkaException(s"Failed to parse the cluster id json $clusterIdJson")
+      throw new KafkaException(s"Failed to parse the cluster id json ${clusterIdJson.mkString("Array(", ", ", ")")}")
     }
   }
 }
@@ -844,7 +844,7 @@ object DelegationTokenChangeNotificationZNode {
 object DelegationTokenChangeNotificationSequenceZNode {
   val SequenceNumberPrefix = "token_change_"
   def createPath = s"${DelegationTokenChangeNotificationZNode.path}/$SequenceNumberPrefix"
-  def deletePath(sequenceNode: String) = s"${DelegationTokenChangeNotificationZNode.path}/${sequenceNode}"
+  def deletePath(sequenceNode: String) = s"${DelegationTokenChangeNotificationZNode.path}/$sequenceNode"
   def encode(tokenId : String): Array[Byte] = tokenId.getBytes(UTF_8)
   def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
 }
@@ -1078,7 +1078,7 @@ object MigrationZNode {
 object ZkData {
 
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
-  val SecureRootPaths = Seq(AdminZNode.path,
+  val SecureRootPaths: Seq[String] = Seq(AdminZNode.path,
     BrokersZNode.path,
     ClusterZNode.path,
     ConfigZNode.path,
@@ -1093,7 +1093,7 @@ object ZkData {
     FeatureZNode.path) ++ ZkAclStore.securePaths
 
   // These are persistent ZK paths that should exist on kafka broker startup.
-  val PersistentZkPaths = Seq(
+  val PersistentZkPaths: Seq[String] = Seq(
     ConsumerPathZNode.path, // old consumer path
     BrokerIdsZNode.path,
     TopicsZNode.path,
@@ -1105,7 +1105,7 @@ object ZkData {
     LogDirEventNotificationZNode.path
   ) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
 
-  val SensitiveRootPaths = Seq(
+  val SensitiveRootPaths: Seq[String] = Seq(
     ConfigEntityTypeZNode.path(ConfigType.USER),
     ConfigEntityTypeZNode.path(ConfigType.BROKER),
     DelegationTokensZNode.path
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 5e06cd5e918..ee960bd35f8 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -46,7 +46,7 @@ import scala.jdk.CollectionConverters._
 
 object ZkMigrationClient {
 
-  val MaxBatchSize = 100
+  private val MaxBatchSize = 100
 
   def apply(
     zkClient: KafkaZkClient,
@@ -295,17 +295,16 @@ class ZkMigrationClient(
     })
   }
 
-  def migrateDelegationTokens(
+  private def migrateDelegationTokens(
     recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
   ): Unit = wrapZkException {
     val batch = new util.ArrayList[ApiMessageAndVersion]()
     val tokens = zkClient.getChildren(DelegationTokensZNode.path)
     for (tokenId <- tokens) {
       zkClient.getDelegationTokenInfo(tokenId) match {
-        case Some(tokenInformation) => {
+        case Some(tokenInformation) =>
           val newDelegationTokenData = new DelegationTokenData(tokenInformation)
           batch.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), 0.toShort))
-        }
         case None =>
       }
     }
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
index 0f37d142bcb..846a599875b 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -112,7 +112,7 @@ class ZkConfigMigrationClient(
       // Taken from ZkAdminManager
       val components = name.split("/")
       if (components.size != 3 || components(1) != "clients")
-        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+        throw new IllegalArgumentException(s"Unexpected config path: $name")
       val entity = List(
         buildEntityData(ClientQuotaEntity.USER, components(0)),
         buildEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
@@ -278,7 +278,7 @@ class ZkConfigMigrationClient(
     quotas.forEach { case (key, value) =>
       val configKey = configKeys.get(key)
       if (configKey == null) {
-        throw new MigrationClientException(s"Invalid configuration key ${key}")
+        throw new MigrationClientException(s"Invalid configuration key $key")
       } else {
         configKey.`type` match {
           case ConfigDef.Type.DOUBLE =>
@@ -290,7 +290,7 @@ class ZkConfigMigrationClient(
             else
               (value + epsilon).toInt
             if ((intValue.toDouble - value).abs > epsilon)
-              throw new InvalidRequestException(s"Configuration ${key} must be a ${configKey.`type`} value")
+              throw new InvalidRequestException(s"Configuration $key must be a ${configKey.`type`} value")
             props.setProperty(key, intValue.toString)
           case _ =>
             throw new MigrationClientException(s"Unexpected config type ${configKey.`type`}")
diff --git a/core/src/main/scala/kafka/zk/migration/ZkDelegationTokenMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkDelegationTokenMigrationClient.scala
index 5b68f221acb..77b301a751a 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkDelegationTokenMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkDelegationTokenMigrationClient.scala
@@ -36,7 +36,7 @@ class ZkDelegationTokenMigrationClient(
 
   val adminZkClient = new AdminZkClient(zkClient)
 
-  override def getDelegationTokens(): java.util.List[String] = {
+  override def getDelegationTokens: java.util.List[String] = {
       zkClient.getChildren(DelegationTokensZNode.path).asJava
   }
 
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 04c9184791c..40648c57cb9 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -179,7 +179,7 @@ class ZooKeeperClient(connectString: String,
     // Safe to cast as we always create a response of the right type
     def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
 
-    def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs())
+    def responseMetadata(sendTimeMs: Long) = ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs())
 
     val sendTimeMs = time.hiResClockMs()
 
@@ -348,7 +348,7 @@ class ZooKeeperClient(connectString: String,
       zNodeChildChangeHandlers.clear()
       stateChangeHandlers.clear()
       zooKeeper.close()
-      metricNames.foreach(metricsGroup.removeMetric(_))
+      metricNames.foreach(metricsGroup.removeMetric)
     }
     info("Closed.")
   }
@@ -365,7 +365,7 @@ class ZooKeeperClient(connectString: String,
   private def reinitialize(): Unit = {
     // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
     // may require additional Zookeeper requests, which will block to acquire the initialization lock
-    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)
+    stateChangeHandlers.values.foreach(callBeforeInitializingSession)
 
     inWriteLock(initializationLock) {
       if (!connectionState.isAlive) {
@@ -386,7 +386,7 @@ class ZooKeeperClient(connectString: String,
       }
     }
 
-    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
+    stateChangeHandlers.values.foreach(callAfterInitializingSession)
   }
 
   /**