You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/24 13:07:40 UTC
[kafka] branch trunk updated: MINOR: Use underscore for variable initialization in Scala sources (#12534)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4170e742956 MINOR: Use underscore for variable initialization in Scala sources (#12534)
4170e742956 is described below
commit 4170e742956699aa15c4a9deb411b33994e6142e
Author: Matthew de Detrich <ma...@aiven.io>
AuthorDate: Wed Aug 24 15:07:13 2022 +0200
MINOR: Use underscore for variable initialization in Scala sources (#12534)
In Scala it's standard practice to use `_` whenever you are initializing variables. In regard to implementation, for object references, `_` initialization maps to `null` so there is no change in behavior.
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../kafka/metrics/KafkaCSVMetricsReporter.scala | 4 +-
.../scala/kafka/metrics/KafkaMetricsReporter.scala | 2 +-
.../main/scala/kafka/network/RequestChannel.scala | 2 +-
.../main/scala/kafka/server/ControllerServer.scala | 18 +++----
.../kafka/server/DelegationTokenManager.scala | 2 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 2 +-
.../src/main/scala/kafka/server/FetchSession.scala | 2 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 62 +++++++++++-----------
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../kafka/server/ReplicationQuotaManager.scala | 2 +-
.../main/scala/kafka/tools/ConsoleConsumer.scala | 6 +--
.../main/scala/kafka/tools/ConsoleProducer.scala | 4 +-
core/src/main/scala/kafka/tools/MirrorMaker.scala | 8 +--
.../kafka/tools/ReplicaVerificationTool.scala | 2 +-
.../scala/kafka/tools/StateChangeLogMerger.scala | 6 +--
core/src/main/scala/kafka/utils/FileLock.scala | 2 +-
.../main/scala/kafka/utils/KafkaScheduler.scala | 2 +-
.../main/scala/kafka/utils/timer/TimerTask.scala | 2 +-
.../scala/kafka/utils/timer/TimerTaskList.scala | 6 +--
.../main/scala/kafka/utils/timer/TimingWheel.scala | 2 +-
.../kafka/admin/ListOffsetsIntegrationTest.scala | 2 +-
.../admin/ReassignPartitionsIntegrationTest.scala | 6 +--
.../AdminClientWithPoliciesIntegrationTest.scala | 2 +-
.../kafka/api/BaseAdminIntegrationTest.scala | 2 +-
.../kafka/api/BaseProducerSendTest.scala | 2 +-
.../kafka/api/ProducerCompressionTest.scala | 2 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 8 +--
.../scala/integration/kafka/api/SaslSetup.scala | 2 +-
.../kafka/server/QuorumTestHarness.scala | 4 +-
.../scala/kafka/security/minikdc/MiniKdc.scala | 4 +-
.../other/kafka/ReplicationQuotasTestRig.scala | 4 +-
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 2 +-
.../kafka/admin/DelegationTokenCommandTest.scala | 2 +-
.../kafka/controller/ControllerContextTest.scala | 2 +-
.../PartitionLeaderElectionAlgorithmsTest.scala | 2 +-
.../controller/PartitionStateMachineTest.scala | 8 +--
.../kafka/controller/ReplicaStateMachineTest.scala | 8 +--
.../coordinator/group/GroupCoordinatorTest.scala | 10 ++--
.../group/GroupMetadataManagerTest.scala | 12 ++---
.../coordinator/group/GroupMetadataTest.scala | 2 +-
.../kafka/integration/KafkaServerTestHarness.scala | 4 +-
.../integration/UncleanLeaderElectionTest.scala | 4 +-
.../test/scala/unit/kafka/log/LocalLogTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 4 +-
.../scala/unit/kafka/log/OffsetIndexTest.scala | 2 +-
.../unit/kafka/log/ProducerStateManagerTest.scala | 4 +-
.../test/scala/unit/kafka/log/TimeIndexTest.scala | 2 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 2 +-
.../delegation/DelegationTokenManagerTest.scala | 4 +-
.../unit/kafka/server/BaseFetchRequestTest.scala | 2 +-
.../unit/kafka/server/DelayedOperationTest.scala | 4 +-
.../DelegationTokenRequestsOnPlainTextTest.scala | 2 +-
.../kafka/server/DelegationTokenRequestsTest.scala | 2 +-
...nTokenRequestsWithDisableTokenFeatureTest.scala | 2 +-
.../FetchRequestDownConversionConfigTest.scala | 2 +-
.../kafka/server/FetchRequestMaxBytesTest.scala | 2 +-
.../unit/kafka/server/IsrExpirationTest.scala | 4 +-
.../server/KafkaMetricReporterClusterIdTest.scala | 4 +-
.../kafka/server/KafkaMetricsReporterTest.scala | 4 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 8 +--
.../unit/kafka/server/ReplicationQuotasTest.scala | 4 +-
.../kafka/server/ServerGenerateBrokerIdTest.scala | 8 +--
.../kafka/server/ServerGenerateClusterIdTest.scala | 6 +--
.../unit/kafka/server/ServerStartupTest.scala | 2 +-
...chDrivenReplicationProtocolAcceptanceTest.scala | 6 +--
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../scala/unit/kafka/utils/timer/TimerTest.scala | 2 +-
69 files changed, 161 insertions(+), 161 deletions(-)
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index 607cd188e0e..ffece7f853e 100755
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -36,8 +36,8 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
with KafkaCSVMetricsReporterMBean
with Logging {
- private var csvDir: File = null
- private var underlying: CsvReporter = null
+ private var csvDir: File = _
+ private var underlying: CsvReporter = _
private var running = false
private var initialized = false
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 30baad3464c..6a50dde69d5 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -53,7 +53,7 @@ trait KafkaMetricsReporter {
object KafkaMetricsReporter {
val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
- private var reporters: ArrayBuffer[KafkaMetricsReporter] = null
+ private var reporters: ArrayBuffer[KafkaMetricsReporter] = _
def startReporters(verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = {
ReporterStarted synchronized {
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2200757c706..eba5e5f929c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -543,7 +543,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
class ErrorMeter(name: String, error: Errors) {
private val tags = Map("request" -> name, "error" -> error.name)
- @volatile private var meter: Meter = null
+ @volatile private var meter: Meter = _
def getOrCreateMeter(): Meter = {
if (meter != null)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 19a6e307d62..fc1d8562b56 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -78,18 +78,18 @@ class ControllerServer(
val awaitShutdownCond = lock.newCondition()
var status: ProcessStatus = SHUTDOWN
- var linuxIoMetricsCollector: LinuxIoMetricsCollector = null
- @volatile var authorizer: Option[Authorizer] = null
- var tokenCache: DelegationTokenCache = null
- var credentialProvider: CredentialProvider = null
- var socketServer: SocketServer = null
+ var linuxIoMetricsCollector: LinuxIoMetricsCollector = _
+ @volatile var authorizer: Option[Authorizer] = None
+ var tokenCache: DelegationTokenCache = _
+ var credentialProvider: CredentialProvider = _
+ var socketServer: SocketServer = _
val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
var createTopicPolicy: Option[CreateTopicPolicy] = None
var alterConfigPolicy: Option[AlterConfigPolicy] = None
- var controller: Controller = null
- var quotaManagers: QuotaManagers = null
- var controllerApis: ControllerApis = null
- var controllerApisHandlerPool: KafkaRequestHandlerPool = null
+ var controller: Controller = _
+ var quotaManagers: QuotaManagers = _
+ var controllerApis: ControllerApis = _
+ var controllerApisHandlerPool: KafkaRequestHandlerPool = _
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 235e3a12ad0..9465197b4ac 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -188,7 +188,7 @@ class DelegationTokenManager(val config: KafkaConfig,
val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
val tokenRemoverScanInterval: Long = config.delegationTokenExpiryCheckIntervalMs
private val lock = new Object()
- private var tokenChangeListener: ZkNodeChangeNotificationListener = null
+ private var tokenChangeListener: ZkNodeChangeNotificationListener = _
def startup(): Unit = {
if (config.tokenAuthEnabled) {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index a860938124e..99f8234059e 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -210,7 +210,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
- private var currentConfig: KafkaConfig = null
+ private var currentConfig: KafkaConfig = _
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index b32cb8bcf66..b233a5ae69e 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -456,7 +456,7 @@ class IncrementalFetchContext(private val time: Time,
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: Boolean)
extends FetchSession.RESP_MAP_ITER {
- var nextElement: util.Map.Entry[TopicIdPartition, FetchResponseData.PartitionData] = null
+ var nextElement: util.Map.Entry[TopicIdPartition, FetchResponseData.PartitionData] = _
override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 6b52511c1ba..592c42d8f4a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -101,55 +101,55 @@ class KafkaServer(
@volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
private var shutdownLatch = new CountDownLatch(1)
- private var logContext: LogContext = null
+ private var logContext: LogContext = _
private val kafkaMetricsReporters: Seq[KafkaMetricsReporter] =
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
- var kafkaYammerMetrics: KafkaYammerMetrics = null
- var metrics: Metrics = null
+ var kafkaYammerMetrics: KafkaYammerMetrics = _
+ var metrics: Metrics = _
- @volatile var dataPlaneRequestProcessor: KafkaApis = null
- var controlPlaneRequestProcessor: KafkaApis = null
+ @volatile var dataPlaneRequestProcessor: KafkaApis = _
+ var controlPlaneRequestProcessor: KafkaApis = _
var authorizer: Option[Authorizer] = None
- @volatile var socketServer: SocketServer = null
- var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
- var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+ @volatile var socketServer: SocketServer = _
+ var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
+ var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
- var logDirFailureChannel: LogDirFailureChannel = null
- @volatile private var _logManager: LogManager = null
+ var logDirFailureChannel: LogDirFailureChannel = _
+ @volatile private var _logManager: LogManager = _
- @volatile private var _replicaManager: ReplicaManager = null
- var adminManager: ZkAdminManager = null
- var tokenManager: DelegationTokenManager = null
+ @volatile private var _replicaManager: ReplicaManager = _
+ var adminManager: ZkAdminManager = _
+ var tokenManager: DelegationTokenManager = _
- var dynamicConfigHandlers: Map[String, ConfigHandler] = null
- var dynamicConfigManager: ZkConfigManager = null
- var credentialProvider: CredentialProvider = null
- var tokenCache: DelegationTokenCache = null
+ var dynamicConfigHandlers: Map[String, ConfigHandler] = _
+ var dynamicConfigManager: ZkConfigManager = _
+ var credentialProvider: CredentialProvider = _
+ var tokenCache: DelegationTokenCache = _
- @volatile var groupCoordinator: GroupCoordinator = null
+ @volatile var groupCoordinator: GroupCoordinator = _
- var transactionCoordinator: TransactionCoordinator = null
+ var transactionCoordinator: TransactionCoordinator = _
- @volatile private var _kafkaController: KafkaController = null
+ @volatile private var _kafkaController: KafkaController = _
var forwardingManager: Option[ForwardingManager] = None
- var autoTopicCreationManager: AutoTopicCreationManager = null
+ var autoTopicCreationManager: AutoTopicCreationManager = _
- var clientToControllerChannelManager: BrokerToControllerChannelManager = null
+ var clientToControllerChannelManager: BrokerToControllerChannelManager = _
- var alterPartitionManager: AlterPartitionManager = null
+ var alterPartitionManager: AlterPartitionManager = _
- var kafkaScheduler: KafkaScheduler = null
+ var kafkaScheduler: KafkaScheduler = _
- @volatile var metadataCache: ZkMetadataCache = null
- var quotaManagers: QuotaFactory.QuotaManagers = null
+ @volatile var metadataCache: ZkMetadataCache = _
+ var quotaManagers: QuotaFactory.QuotaManagers = _
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config)
- private var _zkClient: KafkaZkClient = null
- private var configRepository: ZkConfigRepository = null
+ private var _zkClient: KafkaZkClient = _
+ private var configRepository: ZkConfigRepository = _
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
@@ -157,10 +157,10 @@ class KafkaServer(
(logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))
}.toMap
- private var _clusterId: String = null
- @volatile var _brokerTopicStats: BrokerTopicStats = null
+ private var _clusterId: String = _
+ @volatile var _brokerTopicStats: BrokerTopicStats = _
- private var _featureChangeListener: FinalizedFeatureChangeListener = null
+ private var _featureChangeListener: FinalizedFeatureChangeListener = _
val brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f60bd53a085..64f0ed9129b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -237,7 +237,7 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
- private var logDirFailureHandler: LogDirFailureHandler = null
+ private var logDirFailureHandler: LogDirFailureHandler = _
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
override def doWork(): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index c02936973d3..56a8043ba7c 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -76,7 +76,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
private val time: Time) extends Logging with ReplicaQuota {
private val lock = new ReentrantReadWriteLock()
private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
- private var quota: Quota = null
+ private var quota: Quota = _
private val sensorAccess = new SensorAccess(lock, metrics)
private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString,
s"Tracking byte-rate for $replicationType")
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index ecaaa30a9b3..0162998caa3 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -295,9 +295,9 @@ object ConsoleConsumer extends Logging {
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
// topic must be specified.
- var topicArg: String = null
- var includedTopicsArg: String = null
- var filterSpec: TopicFilter = null
+ var topicArg: String = _
+ var includedTopicsArg: String = _
+ var filterSpec: TopicFilter = _
val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 6afd9a923ea..302e7f870a2 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -286,8 +286,8 @@ object ConsoleProducer {
}
class LineMessageReader extends MessageReader {
- var topic: String = null
- var reader: BufferedReader = null
+ var topic: String = _
+ var reader: BufferedReader = _
var parseKey = false
var keySeparator = "\t"
var parseHeaders = false
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index f6e28656c8d..75c2d144b02 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -63,12 +63,12 @@ import scala.util.{Failure, Success, Try}
@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
object MirrorMaker extends Logging with KafkaMetricsGroup {
- private[tools] var producer: MirrorMakerProducer = null
- private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
+ private[tools] var producer: MirrorMakerProducer = _
+ private var mirrorMakerThreads: Seq[MirrorMakerThread] = _
private val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages not successfully sent by mirror maker.
private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
- private var messageHandler: MirrorMakerMessageHandler = null
+ private var messageHandler: MirrorMakerMessageHandler = _
private var offsetCommitIntervalMs = 0
private var abortOnSendFailure: Boolean = true
@volatile private var exitingOnSendFailure: Boolean = false
@@ -297,7 +297,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
customRebalanceListener: Option[ConsumerRebalanceListener],
includeOpt: Option[String]) {
val regex = includeOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports include."))
- var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
+ var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = _
// We manually maintain the consumed offsets for historical reasons and it could be simplified
// Visible for testing
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 2c74550b2aa..a9bd2c936e1 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -277,7 +277,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[To
@volatile private var lastReportTime = Time.SYSTEM.milliseconds
private var maxLag: Long = -1L
private var offsetWithMaxLag: Long = -1L
- private var maxLagTopicAndPartition: TopicPartition = null
+ private var maxLagTopicAndPartition: TopicPartition = _
initialize()
def createNewFetcherBarrier(): Unit = {
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index de711e5d02f..1a7f679fe36 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -52,10 +52,10 @@ object StateChangeLogMerger extends Logging {
val dateRegex = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}")
val dateFormat = new SimpleDateFormat(dateFormatString)
var files: List[String] = List()
- var topic: String = null
+ var topic: String = _
var partitions: List[Int] = List()
- var startDate: Date = null
- var endDate: Date = null
+ var startDate: Date = _
+ var endDate: Date = _
def main(args: Array[String]): Unit = {
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index 2de16386c95..a7ae1aa735b 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -29,7 +29,7 @@ class FileLock(val file: File) extends Logging {
private val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE)
- private var flock: java.nio.channels.FileLock = null
+ private var flock: java.nio.channels.FileLock = _
/**
* Lock the file or throw an exception if the lock is already held
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 354652ee6fb..ca9f7fda1b7 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -71,7 +71,7 @@ trait Scheduler {
class KafkaScheduler(val threads: Int,
val threadNamePrefix: String = "kafka-scheduler-",
daemon: Boolean = true) extends Scheduler with Logging {
- private var executor: ScheduledThreadPoolExecutor = null
+ private var executor: ScheduledThreadPoolExecutor = _
private val schedulerThreadId = new AtomicInteger(0)
override def startup(): Unit = {
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
index a1995c1df0a..3e36b30114b 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
@@ -20,7 +20,7 @@ trait TimerTask extends Runnable {
val delayMs: Long // timestamp in millisecond
- private[this] var timerTaskEntry: TimerTaskEntry = null
+ private[this] var timerTaskEntry: TimerTaskEntry = _
def cancel(): Unit = {
synchronized {
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index efd04806180..4bec39270e8 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -129,9 +129,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
@volatile
- var list: TimerTaskList = null
- var next: TimerTaskEntry = null
- var prev: TimerTaskEntry = null
+ var list: TimerTaskList = _
+ var next: TimerTaskEntry = _
+ var prev: TimerTaskEntry = _
// if this timerTask is already held by an existing timer task entry,
// setTimerTaskEntry will remove it.
diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
index 648e6d64fdc..e0bc05bd55c 100644
--- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
@@ -106,7 +106,7 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
- @volatile private[this] var overflowWheel: TimingWheel = null
+ @volatile private[this] var overflowWheel: TimingWheel = _
private[this] def addOverflowWheel(): Unit = {
synchronized {
diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index 2a148947fd1..bcb9641e9e8 100644
--- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
val topicName = "foo"
- var adminClient: Admin = null
+ var adminClient: Admin = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 29b4c82740e..d1c1c0919a2 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -40,7 +40,7 @@ import scala.jdk.CollectionConverters._
@Timeout(300)
class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
- var cluster: ReassignPartitionsTestCluster = null
+ var cluster: ReassignPartitionsTestCluster = _
@AfterEach
override def tearDown(): Unit = {
@@ -618,9 +618,9 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
var servers = new mutable.ArrayBuffer[KafkaBroker]
- var brokerList: String = null
+ var brokerList: String = _
- var adminClient: Admin = null
+ var adminClient: Admin = _
def setup(): Unit = {
createServers()
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 5b2213a65e9..44fe4ef64a2 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -43,7 +43,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
import AdminClientWithPoliciesIntegrationTest._
- var client: Admin = null
+ var client: Admin = _
val brokerCount = 3
@BeforeEach
diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 4f95654d541..293b198c0ec 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -48,7 +48,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
def brokerCount = 3
override def logDirCount = 2
- var testInfo: TestInfo = null
+ var testInfo: TestInfo = _
var client: Admin = _
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ce3cd32afde..cbd65e1cf2f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -54,7 +54,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
- protected var admin: Admin = null
+ protected var admin: Admin = _
protected val topic = "topic"
private val numRecords = 100
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 07d9ccb024f..b1e39ebde49 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -37,7 +37,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
private val topic = "topic"
private val numRecords = 2000
- private var broker: KafkaBroker = null
+ private var broker: KafkaBroker = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 322cce6a8dd..e04dd0802a3 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -51,10 +51,10 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
def generateConfigs =
TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
- private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = null
- private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = null
- private var producer3: KafkaProducer[Array[Byte], Array[Byte]] = null
- private var producer4: KafkaProducer[Array[Byte], Array[Byte]] = null
+ private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
+ private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _
+ private var producer3: KafkaProducer[Array[Byte], Array[Byte]] = _
+ private var producer4: KafkaProducer[Array[Byte], Array[Byte]] = _
private val topic1 = "topic-1"
private val topic2 = "topic-2"
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index d613b7242ca..01bb6d81504 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -56,7 +56,7 @@ case object Both extends SaslSetupMode
trait SaslSetup {
private val workDir = TestUtils.tempDir()
private val kdcConf = MiniKdc.createConfig
- private var kdc: MiniKdc = null
+ private var kdc: MiniKdc = _
private var serverKeytabFile: Option[File] = None
private var clientKeytabFile: Option[File] = None
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index c4ca966f9ab..0e2280dd705 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -144,8 +144,8 @@ abstract class QuorumTestHarness extends Logging {
val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
- private var testInfo: TestInfo = null
- private var implementation: QuorumImplementation = null
+ private var testInfo: TestInfo = _
+ private var implementation: QuorumImplementation = _
def isKRaftTest(): Boolean = {
TestInfoUtils.isKRaft(testInfo)
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index ea8815ddccc..6113f10ba19 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -105,8 +105,8 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
private val krb5conf = new File(workDir, "krb5.conf")
private var _port = config.getProperty(MiniKdc.KdcPort).toInt
- private var ds: DirectoryService = null
- private var kdc: KdcServer = null
+ private var ds: DirectoryService = _
+ private var kdc: KdcServer = _
private var closed = false
def port: Int = _port
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 74fc8dfd939..dddd33f2d54 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -98,10 +98,10 @@ object ReplicationQuotasTestRig {
val topicName = "my-topic"
var experimentName = "unset"
val partitionId = 0
- var servers: Seq[KafkaServer] = null
+ var servers: Seq[KafkaServer] = _
val leaderRates = mutable.Map[Int, Array[Double]]()
val followerRates = mutable.Map[Int, Array[Double]]()
- var adminClient: Admin = null
+ var adminClient: Admin = _
def startBrokers(brokerIds: Seq[Int]): Unit = {
println("Starting Brokers")
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 4e2bfee60ee..07d00963d52 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -53,7 +53,7 @@ class AddPartitionsTest extends BaseRequestTest {
val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
val topic5 = "new-topic5"
val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
- var admin: Admin = null
+ var admin: Admin = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
index fec6199e8e9..932a4d27b84 100644
--- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -36,7 +36,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
- var adminClient: Admin = null
+ var adminClient: Admin = _
override def brokerCount = 1
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index e88bb321ad9..e0780003b66 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.{BeforeEach, Test}
class ControllerContextTest {
- var context: ControllerContext = null
+ var context: ControllerContext = _
val brokers: Seq[Int] = Seq(1, 2, 3)
val tp1 = new TopicPartition("A", 0)
val tp2 = new TopicPartition("A", 1)
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
index 4f3aec0af71..b1c4378c3b0 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
@@ -20,7 +20,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
class PartitionLeaderElectionAlgorithmsTest {
- private var controllerContext: ControllerContext = null
+ private var controllerContext: ControllerContext = _
@BeforeEach
def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 9f11d42e697..566d3173a33 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -35,10 +35,10 @@ import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito.{mock, verify, when}
class PartitionStateMachineTest {
- private var controllerContext: ControllerContext = null
- private var mockZkClient: KafkaZkClient = null
- private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
- private var partitionStateMachine: PartitionStateMachine = null
+ private var controllerContext: ControllerContext = _
+ private var mockZkClient: KafkaZkClient = _
+ private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = _
+ private var partitionStateMachine: PartitionStateMachine = _
private val brokerId = 5
private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 34187b13842..c616d026228 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -35,10 +35,10 @@ import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito.{mock, verify, when}
class ReplicaStateMachineTest {
- private var controllerContext: ControllerContext = null
- private var mockZkClient: KafkaZkClient = null
- private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
- private var replicaStateMachine: ReplicaStateMachine = null
+ private var controllerContext: ControllerContext = _
+ private var mockZkClient: KafkaZkClient = _
+ private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = _
+ private var replicaStateMachine: ReplicaStateMachine = _
private val brokerId = 5
private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 164c9ab1fef..6fe3d542e42 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -71,11 +71,11 @@ class GroupCoordinatorTest {
val DefaultRebalanceTimeout = 500
val DefaultSessionTimeout = 500
val GroupInitialRebalanceDelay = 50
- var timer: MockTimer = null
- var groupCoordinator: GroupCoordinator = null
- var replicaManager: ReplicaManager = null
- var scheduler: KafkaScheduler = null
- var zkClient: KafkaZkClient = null
+ var timer: MockTimer = _
+ var groupCoordinator: GroupCoordinator = _
+ var replicaManager: ReplicaManager = _
+ var scheduler: KafkaScheduler = _
+ var zkClient: KafkaZkClient = _
private val groupId = "groupId"
private val protocolType = "consumer"
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 688d6e83b0d..a3298021be0 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -54,13 +54,13 @@ import scala.collection._
class GroupMetadataManagerTest {
- var time: MockTime = null
- var replicaManager: ReplicaManager = null
- var groupMetadataManager: GroupMetadataManager = null
- var scheduler: KafkaScheduler = null
- var partition: Partition = null
+ var time: MockTime = _
+ var replicaManager: ReplicaManager = _
+ var groupMetadataManager: GroupMetadataManager = _
+ var scheduler: KafkaScheduler = _
+ var partition: Partition = _
var defaultOffsetRetentionMs = Long.MaxValue
- var metrics: kMetrics = null
+ var metrics: kMetrics = _
val groupId = "foo"
val groupInstanceId = "bar"
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index eb0748d537c..5e7f8ff90be 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -40,7 +40,7 @@ class GroupMetadataTest {
private val rebalanceTimeoutMs = 60000
private val sessionTimeoutMs = 10000
- private var group: GroupMetadata = null
+ private var group: GroupMetadata = _
@BeforeEach
def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index fe1922cc2b8..5dbac1bb4dc 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
* A test harness that brings up some number of broker nodes
*/
abstract class KafkaServerTestHarness extends QuorumTestHarness {
- var instanceConfigs: Seq[KafkaConfig] = null
+ var instanceConfigs: Seq[KafkaConfig] = _
private val _brokers = new mutable.ArrayBuffer[KafkaBroker]
@@ -60,7 +60,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
_brokers.asInstanceOf[mutable.Buffer[KafkaServer]]
}
- var alive: Array[Boolean] = null
+ var alive: Array[Boolean] = _
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index adab6bc88a9..d003510f7e8 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -47,8 +47,8 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
// reduce test execution time
val enableControlledShutdown = true
- var configProps1: Properties = null
- var configProps2: Properties = null
+ var configProps1: Properties = _
+ var configProps2: Properties = _
var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig]
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index d92f1576cd3..c816ac5e009 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -35,7 +35,7 @@ import scala.jdk.CollectionConverters._
class LocalLogTest {
- var config: KafkaConfig = null
+ var config: KafkaConfig = _
val tmpDir: File = TestUtils.tempDir()
val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
val topicPartition = new TopicPartition("test_topic", 1)
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 168d6e0b05c..f9409dc949e 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -45,7 +45,7 @@ import scala.collection.{Iterable, Map, mutable}
import scala.jdk.CollectionConverters._
class LogLoaderTest {
- var config: KafkaConfig = null
+ var config: KafkaConfig = _
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 001c62aa2dc..6009068185c 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -54,8 +54,8 @@ class LogManagerTest {
logProps.put(LogConfig.RetentionMsProp, maxLogAgeMs: java.lang.Integer)
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
val logConfig = LogConfig(logProps)
- var logDir: File = null
- var logManager: LogManager = null
+ var logDir: File = _
+ var logManager: LogManager = _
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 4a9dcd007cd..96cff3f9cd0 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -34,7 +34,7 @@ import scala.annotation.nowarn
class OffsetIndexTest {
- var idx: OffsetIndex = null
+ var idx: OffsetIndex = _
val maxEntries = 30
val baseOffset = 45L
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 60f0c1ce2c1..6a0d7768002 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -36,8 +36,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{mock, when}
class ProducerStateManagerTest {
- private var logDir: File = null
- private var stateManager: ProducerStateManager = null
+ private var logDir: File = _
+ private var stateManager: ProducerStateManager = _
private val partition = new TopicPartition("test", 0)
private val producerId = 1L
private val maxTransactionTimeoutMs = 5 * 60 * 1000
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
index 3318a529608..04d4adfab7c 100644
--- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
* Unit test for time index.
*/
class TimeIndexTest {
- var idx: TimeIndex = null
+ var idx: TimeIndex = _
val maxEntries = 30
val baseOffset = 45L
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 9db288b5291..15f74c1823e 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -46,7 +46,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
class UnifiedLogTest {
- var config: KafkaConfig = null
+ var config: KafkaConfig = _
val brokerTopicStats = new BrokerTopicStats
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index 0cd67edfd0c..fa828b166f2 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -55,8 +55,8 @@ class DelegationTokenManagerTest extends QuorumTestHarness {
val secretKey = "secretKey"
val maxLifeTimeMsDefault = Defaults.DelegationTokenMaxLifeTimeMsDefault
val renewTimeMsDefault = Defaults.DelegationTokenExpiryTimeMsDefault
- var tokenCache: DelegationTokenCache = null
- var props: Properties = null
+ var tokenCache: DelegationTokenCache = _
+ var props: Properties = _
var createTokenResult: CreateTokenResult = _
var error: Errors = Errors.NONE
diff --git a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
index 4670dc4fd5a..cfcd61dba93 100644
--- a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
class BaseFetchRequestTest extends BaseRequestTest {
- protected var producer: KafkaProducer[String, String] = null
+ protected var producer: KafkaProducer[String, String] = _
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.FetchMaxBytes, Int.MaxValue.toString)
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 5df53c83736..73b70e3298e 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -32,8 +32,8 @@ import scala.jdk.CollectionConverters._
class DelayedOperationTest {
- var purgatory: DelayedOperationPurgatory[DelayedOperation] = null
- var executorService: ExecutorService = null
+ var purgatory: DelayedOperationPurgatory[DelayedOperation] = _
+ var executorService: ExecutorService = _
@BeforeEach
def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index eaf7a250d0c..2aa2b6c4b13 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Assertions.assertThrows
import scala.concurrent.ExecutionException
class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
- var adminClient: Admin = null
+ var adminClient: Admin = _
override def brokerCount = 1
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index 950a3039348..6121594cfe5 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -35,7 +35,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
- var adminClient: Admin = null
+ var adminClient: Admin = _
override def brokerCount = 1
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index a6a5af48772..4a6a2b02239 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -33,7 +33,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
- var adminClient: Admin = null
+ var adminClient: Admin = _
override def brokerCount = 1
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 0cf7c1d8e2e..56a3485da40 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -35,7 +35,7 @@ import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class FetchRequestDownConversionConfigTest extends BaseRequestTest {
- private var producer: KafkaProducer[String, String] = null
+ private var producer: KafkaProducer[String, String] = _
override def brokerCount: Int = 2
@BeforeEach
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
index d6915519442..d7ec4c3e365 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
@@ -35,7 +35,7 @@ import scala.jdk.CollectionConverters._
class FetchRequestMaxBytesTest extends BaseRequestTest {
override def brokerCount: Int = 1
- private var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+ private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
private val testTopic = "testTopic"
private val testTopicPartition = new TopicPartition(testTopic, 0)
private val messages = IndexedSeq(
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index bba439cbe71..baed478a0bf 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -52,8 +52,8 @@ class IsrExpirationTest {
val time = new MockTime
val metrics = new Metrics
- var quotaManager: QuotaManagers = null
- var replicaManager: ReplicaManager = null
+ var quotaManager: QuotaManagers = _
+ var replicaManager: ReplicaManager = _
var alterIsrManager: MockAlterPartitionManager = _
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
index 26f6a12367a..65d9c3080c3 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -76,8 +76,8 @@ object KafkaMetricReporterClusterIdTest {
}
class KafkaMetricReporterClusterIdTest extends QuorumTestHarness {
- var server: KafkaServer = null
- var config: KafkaConfig = null
+ var server: KafkaServer = _
+ var config: KafkaConfig = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 1adf544819f..40b839fdd38 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -63,8 +63,8 @@ object KafkaMetricsReporterTest {
}
class KafkaMetricsReporterTest extends QuorumTestHarness {
- var broker: KafkaBroker = null
- var config: KafkaConfig = null
+ var broker: KafkaBroker = _
+ var config: KafkaConfig = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 9a6aa7d73fe..37c5a097bc0 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -44,20 +44,20 @@ class LogRecoveryTest extends QuorumTestHarness {
overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
- var configs: Seq[KafkaConfig] = null
+ var configs: Seq[KafkaConfig] = _
val topic = "new-topic"
val partitionId = 0
val topicPartition = new TopicPartition(topic, partitionId)
- var server1: KafkaServer = null
- var server2: KafkaServer = null
+ var server1: KafkaServer = _
+ var server2: KafkaServer = _
def configProps1 = configs.head
def configProps2 = configs.last
val message = "hello"
- var producer: KafkaProducer[Integer, String] = null
+ var producer: KafkaProducer[Integer, String] = _
def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
var servers = Seq.empty[KafkaServer]
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 64298d43312..96af81d5591 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -46,9 +46,9 @@ class ReplicationQuotasTest extends QuorumTestHarness {
def percentError(percent: Int, value: Long): Long = Math.round(value * percent / 100.0)
val msg100KB = new Array[Byte](100000)
- var brokers: Seq[KafkaServer] = null
+ var brokers: Seq[KafkaServer] = _
val topic = "topic1"
- var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+ var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
@AfterEach
override def tearDown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 096debeedc0..16d9f4dda83 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -29,10 +29,10 @@ import java.io.File
import org.apache.zookeeper.KeeperException.NodeExistsException
class ServerGenerateBrokerIdTest extends QuorumTestHarness {
- var props1: Properties = null
- var config1: KafkaConfig = null
- var props2: Properties = null
- var config2: KafkaConfig = null
+ var props1: Properties = _
+ var config1: KafkaConfig = _
+ var props2: Properties = _
+ var config2: KafkaConfig = _
val brokerMetaPropsFile = "meta.properties"
var servers: Seq[KafkaServer] = Seq()
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index fd9b365c6a3..d2a6df5dda3 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -34,9 +34,9 @@ import org.apache.kafka.test.TestUtils.isValidClusterId
class ServerGenerateClusterIdTest extends QuorumTestHarness {
- var config1: KafkaConfig = null
- var config2: KafkaConfig = null
- var config3: KafkaConfig = null
+ var config1: KafkaConfig = _
+ var config2: KafkaConfig = _
+ var config3: KafkaConfig = _
var servers: Seq[KafkaServer] = Seq()
val brokerMetaPropsFile = "meta.properties"
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index e80084dfe33..62313498d34 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
class ServerStartupTest extends QuorumTestHarness {
- private var server: KafkaServer = null
+ private var server: KafkaServer = _
@AfterEach
override def tearDown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index c8c7a89df77..9e347656846 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -54,9 +54,9 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
val topic = "topic1"
val msg = new Array[Byte](1000)
val msgBigger = new Array[Byte](10000)
- var brokers: Seq[KafkaServer] = null
- var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
- var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = null
+ var brokers: Seq[KafkaServer] = _
+ var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
+ var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index ef2e8200107..b939db2a8c1 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -53,7 +53,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
val t2p0 = new TopicPartition(topic2, 0)
val t2p2 = new TopicPartition(topic2, 2)
val tp = t1p0
- var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+ var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
@AfterEach
override def tearDown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 592d02dc4bb..12c093d3be5 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -673,7 +673,7 @@ object TestUtils extends Logging {
def stackedIterator[T](s: Iterator[T]*): Iterator[T] = {
new Iterator[T] {
- var cur: Iterator[T] = null
+ var cur: Iterator[T] = _
val topIterator = s.iterator
def hasNext: Boolean = {
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
index 2c08627e2c5..752b3420a7e 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
@@ -36,7 +36,7 @@ class TimerTest {
}
}
- private[this] var timer: Timer = null
+ private[this] var timer: Timer = _
@BeforeEach
def setup(): Unit = {