You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/11/30 15:26:51 UTC

[kafka] branch spotbugs-java11-2.1 created (now 36fff71)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch spotbugs-java11-2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 36fff71  KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943)

This branch includes the following new commits:

     new 347ecd8  MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846)
     new 36fff71  KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/02: MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846)

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch spotbugs-java11-2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 347ecd896fb33d708ad31cd446dd4f9ee5672220
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat Nov 10 13:58:18 2018 -0800

    MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846)
    
    EasyMock 4.0.x includes a change that relies on the caller for inferring
    the return type of mock creator methods. Updated a number of Scala
    tests for compilation and execution to succeed.
    
    The versions of EasyMock and PowerMock in this PR include full support
    for Java 11.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 build.gradle                                       |  13 +-
 .../kafka/server/DelayedFetchTest.scala            |   6 +-
 .../kafka/common/InterBrokerSendThreadTest.scala   |   2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   2 +-
 .../scala/unit/kafka/admin/ConfigCommandTest.scala |   6 +-
 .../admin/ReassignPartitionsCommandTest.scala      |  12 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  10 +-
 .../kafka/controller/ControllerTestUtils.scala     |   2 +-
 .../controller/PartitionStateMachineTest.scala     |   4 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |   6 +-
 .../group/GroupMetadataManagerTest.scala           |  12 +-
 .../transaction/ProducerIdManagerTest.scala        |   2 +-
 .../TransactionCoordinatorConcurrencyTest.scala    |   8 +-
 .../TransactionMarkerChannelManagerTest.scala      |   6 +-
 ...sactionMarkerRequestCompletionHandlerTest.scala |   5 +-
 .../transaction/TransactionStateManagerTest.scala  |   4 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  10 +-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |   2 +-
 .../kafka/server/AbstractFetcherManagerTest.scala  |   2 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala |   2 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |   7 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   2 +-
 .../server/HighwatermarkPersistenceTest.scala      |   2 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |   8 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  24 ++--
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   8 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 112 ++++++++--------
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 146 ++++++++++-----------
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  16 +--
 .../unit/kafka/server/ReplicaManagerTest.scala     |  10 +-
 .../unit/kafka/server/ServerStartupTest.scala      |   2 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  10 +-
 .../server/ThrottledChannelExpirationTest.scala    |   4 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  11 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala    |   9 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   2 +-
 gradle/dependencies.gradle                         |  11 +-
 37 files changed, 251 insertions(+), 249 deletions(-)

diff --git a/build.gradle b/build.gradle
index e78d2ce..bba3d83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,11 +29,11 @@ buildscript {
     // For Apache Rat plugin to ignore non-Git files
     classpath "org.ajoberstar:grgit:1.9.3"
     classpath 'com.github.ben-manes:gradle-versions-plugin:0.20.0'
-    classpath 'org.scoverage:gradle-scoverage:2.4.0'
-    classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.0'
-    classpath 'org.owasp:dependency-check-gradle:3.3.2'
-    classpath "com.diffplug.spotless:spotless-plugin-gradle:3.15.0"
-    classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.4"
+    classpath 'org.scoverage:gradle-scoverage:2.5.0'
+    classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
+    classpath 'org.owasp:dependency-check-gradle:3.3.4'
+    classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0"
+    classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.5"
   }
 }
 
@@ -369,8 +369,7 @@ subprojects {
 
   if (!JavaVersion.current().isJava11Compatible()) {
     spotbugs {
-      // 3.1.6 has a regression that breaks our build, seems to be https://github.com/spotbugs/spotbugs/pull/688
-      toolVersion = '3.1.5'
+      toolVersion = '3.1.8'
       excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
       ignoreFailures = false
     }
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 890ea3b..f7c51f7 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -30,8 +30,8 @@ import org.junit.Assert._
 
 class DelayedFetchTest extends EasyMockSupport {
   private val maxBytes = 1024
-  private val replicaManager = mock(classOf[ReplicaManager])
-  private val replicaQuota = mock(classOf[ReplicaQuota])
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
 
   @Test
   def testFetchWithFencedEpoch(): Unit = {
@@ -58,7 +58,7 @@ class DelayedFetchTest extends EasyMockSupport {
       quota = replicaQuota,
       responseCallback = callback)
 
-    val partition = mock(classOf[Partition])
+    val partition: Partition = mock(classOf[Partition])
 
     EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
         .andReturn(partition)
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index 6838653..5c0ea2d 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -30,7 +30,7 @@ import scala.collection.mutable
 
 class InterBrokerSendThreadTest {
   private val time = new MockTime()
-  private val networkClient = EasyMock.createMock(classOf[NetworkClient])
+  private val networkClient: NetworkClient = EasyMock.createMock(classOf[NetworkClient])
   private val completionHandler = new StubCompletionHandler
   private val requestTimeoutMs = 1000
 
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index a1c317e..a82657e 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -142,7 +142,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val topic = "test.topic"
 
     // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
-    val zkMock = EasyMock.createNiceMock(classOf[ZkUtils])
+    val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
     EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false)
     EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic"))
     EasyMock.replay(zkMock)
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index cb261f6..ee4a6ef 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -245,12 +245,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5"))
     val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
     future.complete(util.Collections.singletonMap(resource, new Config(configEntries)))
-    val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+    val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
     EasyMock.expect(describeResult.all()).andReturn(future).once()
 
     val alterFuture = new KafkaFutureImpl[Void]
     alterFuture.complete(null)
-    val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
+    val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
     EasyMock.expect(alterResult.all()).andReturn(alterFuture)
 
     val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
@@ -622,7 +622,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def testQuotaDescribeEntities() {
-    val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
     def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
       val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 213c23a..0d89430 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -268,7 +268,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(existing)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.fetchEntityConfig(anyString(), anyString())).andStubReturn(new Properties)
@@ -294,7 +294,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(existing)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
@@ -328,7 +328,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(existing)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
@@ -364,7 +364,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(brokers = brokers)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     expect(admin.fetchEntityConfig(is(ConfigType.Topic), anyString())).andStubReturn(new Properties)
     expect(admin.changeBrokerConfig(anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes()
@@ -399,7 +399,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(brokers = Seq(100, 101))
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     expect(admin.fetchEntityConfig(is(ConfigType.Broker), anyString())).andStubReturn(new Properties)
     expect(admin.fetchEntityConfig(is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs))
@@ -567,7 +567,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
   def stubZKClient(existingAssignment: Map[TopicPartition, Seq[Int]] = Map[TopicPartition, Seq[Int]](),
                    brokers: Seq[Int] = Seq[Int]()): KafkaZkClient = {
-    val zkClient = createMock(classOf[KafkaZkClient])
+    val zkClient: KafkaZkClient = createMock(classOf[KafkaZkClient])
     expect(zkClient.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Set[String]])).andStubReturn(existingAssignment)
     expect(zkClient.getAllBrokersInCluster).andStubReturn(brokers.map(TestUtils.createBroker(_, "", 1)))
     replay(zkClient)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b5b271e..e05f148 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -75,7 +75,7 @@ class PartitionTest {
     val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
     brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
     val brokerConfig = KafkaConfig.fromProps(brokerProps)
-    val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+    val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     replicaManager = new ReplicaManager(
       config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time),
       logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
@@ -370,8 +370,8 @@ class PartitionTest {
                                       isLeader: Boolean,
                                       log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
     val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val replicaManager = EasyMock.mock(classOf[ReplicaManager])
-    val zkClient = EasyMock.mock(classOf[KafkaZkClient])
+    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
 
     val partition = new Partition(topicPartition,
       isOffline = false,
@@ -465,8 +465,8 @@ class PartitionTest {
   def testListOffsetIsolationLevels(): Unit = {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val replicaManager = EasyMock.mock(classOf[ReplicaManager])
-    val zkClient = EasyMock.mock(classOf[KafkaZkClient])
+    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
 
     val partition = new Partition(topicPartition,
       isOffline = false,
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
index b0413a7..84b956d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -22,7 +22,7 @@ object ControllerTestUtils {
 
   /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
   def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
-    val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
+    val mockEvent: ControllerEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
     EasyMock.expect(mockEvent.state).andReturn(controllerState)
     EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
       def answer(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 3370b54..0e8f98e 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -433,7 +433,7 @@ class PartitionStateMachineTest extends JUnitSuite {
       mockZkClient, partitionState, mockControllerBrokerRequestBatch)
 
     def createMockController() = {
-      val mockController = EasyMock.createMock(classOf[KafkaController])
+      val mockController: KafkaController = EasyMock.createMock(classOf[KafkaController])
       EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes()
       EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes()
       EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes()
@@ -444,7 +444,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     }
 
     val mockController = createMockController()
-    val mockEventManager = EasyMock.createMock(classOf[ControllerEventManager])
+    val mockEventManager: ControllerEventManager = EasyMock.createMock(classOf[ControllerEventManager])
     EasyMock.replay(mockController, replicaStateMachine, mockEventManager)
 
     val topicDeletionManager = new TopicDeletionManager(mockController, mockEventManager, mockZkClient)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index c2c0841..c162342 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -928,7 +928,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Empty.toString, summary.state)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
-    val partition = EasyMock.niceMock(classOf[Partition])
+    val partition: Partition = EasyMock.niceMock(classOf[Partition])
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1425,7 +1425,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, leaveGroupResult)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
-    val partition = EasyMock.niceMock(classOf[Partition])
+    val partition: Partition = EasyMock.niceMock(classOf[Partition])
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1466,7 +1466,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, leaveGroupResult)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
-    val partition = EasyMock.niceMock(classOf[Partition])
+    val partition: Partition = EasyMock.niceMock(classOf[Partition])
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 9ab9705..3ab4a13 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -729,7 +729,7 @@ class GroupMetadataManagerTest {
     val tp2 = new TopicPartition("bar", 0)
     val tp3 = new TopicPartition("xxx", 0)
 
-    val logMock =  EasyMock.mock(classOf[Log])
+    val logMock: Log = EasyMock.mock(classOf[Log])
     EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
 
     val segment1MemberId = "a"
@@ -1785,19 +1785,19 @@ class GroupMetadataManagerTest {
       offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
 
     // Prepend empty control batch to valid records
-    val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+    val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch])
     EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
     EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
     EasyMock.expect(mockBatch.isTransactional).andReturn(true)
     EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
     EasyMock.replay(mockBatch)
-    val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+    val mockRecords: MemoryRecords = EasyMock.createMock(classOf[MemoryRecords])
     EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes()
     EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
     EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
     EasyMock.replay(mockRecords)
 
-    val logMock = EasyMock.mock(classOf[Log])
+    val logMock: Log = EasyMock.mock(classOf[Log])
     EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
       maxLength = EasyMock.anyInt(),
@@ -1886,7 +1886,7 @@ class GroupMetadataManagerTest {
   private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
                                       startOffset: Long,
                                       records: MemoryRecords): Unit = {
-    val logMock =  EasyMock.mock(classOf[Log])
+    val logMock: Log =  EasyMock.mock(classOf[Log])
     EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
     EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
@@ -1902,7 +1902,7 @@ class GroupMetadataManagerTest {
                                       startOffset: Long,
                                       records: MemoryRecords): Long = {
     val endOffset = startOffset + records.records.asScala.size
-    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+    val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
 
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 660e623..b2cc4a5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
 
 class ProducerIdManagerTest {
 
-  private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+  private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
   @After
   def tearDown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index b2a6733..3cf9566 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -81,13 +81,13 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
       new MockTimer,
       reaperEnabled = false)
     val brokerNode = new Node(0, "host", 10)
-    val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.anyString(),
       EasyMock.anyInt(),
       EasyMock.anyObject())
     ).andReturn(Some(brokerNode)).anyTimes()
-    val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
+    val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
     txnMarkerChannelManager = new TransactionMarkerChannelManager(
       KafkaConfig.fromProps(serverProps),
       metadataCache,
@@ -246,8 +246,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
 
   private def prepareTxnLog(partitionId: Int): Unit = {
 
-    val logMock =  EasyMock.mock(classOf[Log])
-    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+    val logMock: Log =  EasyMock.mock(classOf[Log])
+    val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
 
     val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 454b361..44d5c5f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 class TransactionMarkerChannelManagerTest {
-  private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
-  private val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
-  private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+  private val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
+  private val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
+  private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
 
   private val partition1 = new TopicPartition("topic1", 0)
   private val partition2 = new TopicPartition("topic1", 1)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 3ca6c1b..85159c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -47,9 +47,10 @@ class TransactionMarkerRequestCompletionHandlerTest {
   private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerEpoch, txnTimeoutMs,
     PrepareCommit, mutable.Set[TopicPartition](topicPartition), 0L, 0L)
 
-  private val markerChannelManager = EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
+  private val markerChannelManager: TransactionMarkerChannelManager =
+    EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
 
-  private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+  private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
 
   private val handler = new TransactionMarkerRequestCompletionHandler(brokerId, txnStateManager, markerChannelManager, txnIdAndMarkers)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index d2fe7ea..ba8fe1d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -574,8 +574,8 @@ class TransactionStateManagerTest {
                             records: MemoryRecords): Unit = {
     EasyMock.reset(replicaManager)
 
-    val logMock =  EasyMock.mock(classOf[Log])
-    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+    val logMock: Log = EasyMock.mock(classOf[Log])
+    val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
 
     val endOffset = startOffset + records.records.asScala.size
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7728998..9b39567 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -325,7 +325,7 @@ class LogTest {
   @Test
   def testSizeForLargeLogs(): Unit = {
     val largeSize = Int.MaxValue.toLong * 2
-    val logSegment = EasyMock.createMock(classOf[LogSegment])
+    val logSegment: LogSegment = EasyMock.createMock(classOf[LogSegment])
 
     EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes
     EasyMock.replay(logSegment)
@@ -347,7 +347,7 @@ class LogTest {
 
   @Test
   def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     // Load the log
     EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
@@ -426,7 +426,7 @@ class LogTest {
 
   @Test
   def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     stateManager.updateMapEndOffset(0L)
     EasyMock.expectLastCall().anyTimes()
@@ -463,7 +463,7 @@ class LogTest {
 
   @Test
   def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     stateManager.updateMapEndOffset(0L)
     EasyMock.expectLastCall().anyTimes()
@@ -503,7 +503,7 @@ class LogTest {
 
   @Test
   def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
 
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9afb145..b49b5e1 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -754,7 +754,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 145.toShort
     val baseOffset = 15
 
-    val batch = EasyMock.createMock(classOf[RecordBatch])
+    val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
     EasyMock.expect(batch.isControlBatch).andReturn(true).once
     EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
     EasyMock.replay(batch)
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index cd00ff1..0a4d7c1 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -26,7 +26,7 @@ class AbstractFetcherManagerTest {
 
   @Test
   def testAddAndRemovePartition(): Unit = {
-    val fetcher = EasyMock.mock(classOf[AbstractFetcherThread])
+    val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread])
     val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) {
       override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
         fetcher
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 6f75174..e10d4b2 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -59,7 +59,7 @@ class ClientQuotaManagerTest {
 
     val request = builder.build()
     val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
-    val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+    val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
 
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 41b9055..45ef18f 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -21,6 +21,7 @@ import java.util
 import java.util.Properties
 
 import kafka.utils.TestUtils
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
@@ -303,7 +304,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
   def testDynamicListenerConfig(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
     val oldConfig =  KafkaConfig.fromProps(props)
-    val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+    val kafkaServer: KafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
     EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
     EasyMock.replay(kafkaServer)
 
@@ -328,7 +329,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
 
   @Test
   def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
-    val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+    val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
     EasyMock.replay(zkClient)
 
@@ -362,4 +363,4 @@ class TestDynamicThreadPool() extends BrokerReconfigurable {
     assertEquals(10, newConfig.numIoThreads)
     assertEquals(100, newConfig.backgroundThreads)
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cabe0a9..789dbae 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -210,7 +210,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Create a mock ConfigHandler to record config changes it is asked to process
     val entityArgument = EasyMock.newCapture[String]
     val propertiesArgument = EasyMock.newCapture[Properties]
-    val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
+    val handler: ConfigHandler = EasyMock.createNiceMock(classOf[ConfigHandler])
     handler.processConfigChanges(
       EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
       EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 7451234..61cbd2c 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -35,7 +35,7 @@ class HighwatermarkPersistenceTest {
 
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
   val topic = "foo"
-  val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
+  val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
   val logManagers = configs map { config =>
     TestUtils.createLogManager(
       logDirs = config.logDirs.map(new File(_)),
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 3dff709..d5bdf14 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -55,7 +55,7 @@ class IsrExpirationTest {
 
   @Before
   def setUp() {
-    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     EasyMock.replay(logManager)
 
@@ -252,8 +252,8 @@ class IsrExpirationTest {
   }
 
   private def logMock: Log = {
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
-    val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
+    val log: Log = EasyMock.createMock(classOf[Log])
+    val cache: LeaderEpochFileCache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b903c4a..be30f8a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -55,24 +55,24 @@ import scala.collection.Map
 
 class KafkaApisTest {
 
-  private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
-  private val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
-  private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
-  private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
-  private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
-  private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
-  private val controller = EasyMock.createNiceMock(classOf[KafkaController])
-  private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+  private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+  private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+  private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
+  private val groupCoordinator: GroupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
+  private val adminManager: AdminManager = EasyMock.createNiceMock(classOf[AdminManager])
+  private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
+  private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController])
+  private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
   private val metrics = new Metrics()
   private val brokerId = 1
   private val metadataCache = new MetadataCache(brokerId)
   private val authorizer: Option[Authorizer] = None
-  private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
-  private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
-  private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+  private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
+  private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
+  private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
   private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
     replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
-  private val fetchManager = EasyMock.createNiceMock(classOf[FetchManager])
+  private val fetchManager: FetchManager = EasyMock.createNiceMock(classOf[FetchManager])
   private val brokerTopicStats = new BrokerTopicStats
   private val clusterId = "clusterId"
   private val time = new MockTime
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 740b28e..50449dc 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -211,8 +211,8 @@ class LogOffsetTest extends BaseRequestTest {
    * a race condition) */
   @Test
   def testFetchOffsetsBeforeWithChangingSegmentSize() {
-    val log = EasyMock.niceMock(classOf[Log])
-    val logSegment = EasyMock.niceMock(classOf[LogSegment])
+    val log: Log = EasyMock.niceMock(classOf[Log])
+    val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
     EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] {
       private val value = new AtomicInteger(0)
       def answer: Int = value.getAndIncrement()
@@ -228,8 +228,8 @@ class LogOffsetTest extends BaseRequestTest {
    * different (simulating a race condition) */
   @Test
   def testFetchOffsetsBeforeWithChangingSegments() {
-    val log = EasyMock.niceMock(classOf[Log])
-    val logSegment = EasyMock.niceMock(classOf[LogSegment])
+    val log: Log = EasyMock.niceMock(classOf[Log])
+    val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
     EasyMock.expect(log.logSegments).andStubAnswer {
       new IAnswer[Iterable[LogSegment]] {
         def answer = new Iterable[LogSegment] {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 5d92b61..a4fbaf2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -50,9 +50,9 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val partitionT1p0 = createMock(classOf[Partition])
-    val partitionT1p1 = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val partitionT1p0: Partition = createMock(classOf[Partition])
+    val partitionT1p1: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpochT1p0 = 2
     val leaderEpochT1p1 = 5
@@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val partitionT1p0 = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val partitionT1p0: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 2
     val leo = 13
@@ -149,18 +149,18 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache])
-    val futureReplicaLeaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaT1p0 = createNiceMock(classOf[Replica])
-    val replicaT1p1 = createNiceMock(classOf[Replica])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochsT1p0: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochsT1p1: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaT1p0: Replica = createNiceMock(classOf[Replica])
+    val replicaT1p1: Replica = createNiceMock(classOf[Replica])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureReplicaT1p0 = createNiceMock(classOf[Replica])
-    val futureReplicaT1p1 = createNiceMock(classOf[Replica])
-    val partitionT1p0 = createMock(classOf[Partition])
-    val partitionT1p1 = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val futureReplicaT1p0: Replica = createNiceMock(classOf[Replica])
+    val futureReplicaT1p1: Replica = createNiceMock(classOf[Replica])
+    val partitionT1p0: Partition = createMock(classOf[Partition])
+    val partitionT1p1: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val leaderEpoch = 2
@@ -228,14 +228,14 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val leaderEpoch = 5
@@ -301,13 +301,13 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val initialFetchOffset = 100
@@ -356,13 +356,13 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[kafka.log.LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val futureReplicaLeaderEpoch = 1
@@ -439,13 +439,13 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val leaderEpoch = 5
@@ -494,12 +494,12 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     //Stubs
     expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
@@ -543,12 +543,12 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     //Stubs
     expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index a370101..c65c254 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -79,7 +79,7 @@ class ReplicaFetcherThreadTest {
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
     props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
     val config = KafkaConfig.fromProps(props)
-    val leaderEndpoint = createMock(classOf[BlockingSend])
+    val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
     expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
       override def answer(): ClientResponse = {
         toFail = true // assert no leader request is sent
@@ -123,7 +123,7 @@ class ReplicaFetcherThreadTest {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
     val config = KafkaConfig.fromProps(props)
-    val leaderEndpoint = createMock(classOf[BlockingSend])
+    val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
 
     expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
       override def answer(): ClientResponse = {
@@ -162,13 +162,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
 
@@ -261,7 +261,7 @@ class ReplicaFetcherThreadTest {
   def shouldHandleExceptionFromBlockingSend(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     val config = KafkaConfig.fromProps(props)
-    val mockBlockingSend = createMock(classOf[BlockingSend])
+    val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
 
     expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once()
     replay(mockBlockingSend)
@@ -295,12 +295,12 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
 
@@ -355,13 +355,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
     val initialLEO = 200
@@ -404,13 +404,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpochAtFollower = 5
     val leaderEpochAtLeader = 4
@@ -458,13 +458,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     // Setup all dependencies
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val initialLEO = 200
 
@@ -529,13 +529,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(props)
 
     // Setup all dependencies
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val initialLEO = 200
 
@@ -590,13 +590,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val initialFetchOffset = 100
     val initialLeo = 300
@@ -635,13 +635,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[kafka.log.LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
     val highWaterMark = 100
@@ -693,13 +693,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all stubs
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createNiceMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createNiceMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
     val leaderEpoch = 4
 
@@ -746,13 +746,13 @@ class ReplicaFetcherThreadTest {
     val initialLEO = 100
 
     //Setup all stubs
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createNiceMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createNiceMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
     //Stub return values
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
@@ -796,7 +796,7 @@ class ReplicaFetcherThreadTest {
   def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     val config = KafkaConfig.fromProps(props)
-    val mockBlockingSend = createMock(classOf[BlockingSend])
+    val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
 
     expect(mockBlockingSend.initiateClose()).andThrow(new IllegalArgumentException()).once()
     expect(mockBlockingSend.close()).andThrow(new IllegalStateException()).once()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9b59e71..50b5fa7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -21,7 +21,7 @@ import java.util.{Optional, Properties}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
-import kafka.log.{Log, LogOffsetSnapshot}
+import kafka.log.{Log, LogManager, LogOffsetSnapshot}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
@@ -151,7 +151,7 @@ class ReplicaManagerQuotasTest {
     // Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
     def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
       val endOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
-      val partition = EasyMock.createMock(classOf[Partition])
+      val partition: Partition = EasyMock.createMock(classOf[Partition])
 
       val offsetSnapshot = LogOffsetSnapshot(
         logStartOffset = 0L,
@@ -161,7 +161,7 @@ class ReplicaManagerQuotasTest {
       EasyMock.expect(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true))
           .andReturn(offsetSnapshot)
 
-      val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+      val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
       EasyMock.expect(replicaManager.getPartitionOrException(
         EasyMock.anyObject[TopicPartition], EasyMock.anyBoolean()))
         .andReturn(partition).anyTimes()
@@ -192,11 +192,11 @@ class ReplicaManagerQuotasTest {
   }
 
   def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
-    val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
-    val scheduler = createNiceMock(classOf[KafkaScheduler])
+    val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+    val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
 
     //Create log which handles both a regular read and a 0 bytes read
-    val log = createNiceMock(classOf[Log])
+    val log: Log = createNiceMock(classOf[Log])
     expect(log.logStartOffset).andReturn(0L).anyTimes()
     expect(log.logEndOffset).andReturn(20L).anyTimes()
     expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
@@ -225,7 +225,7 @@ class ReplicaManagerQuotasTest {
     replay(log)
 
     //Create log manager
-    val logManager = createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
 
     //Return the same log for each partition as it doesn't matter
     expect(logManager.getLog(anyObject(), anyBoolean())).andReturn(Some(log)).anyTimes()
@@ -263,7 +263,7 @@ class ReplicaManagerQuotasTest {
   }
 
   def mockQuota(bound: Long): ReplicaQuota = {
-    val quota = createMock(classOf[ReplicaQuota])
+    val quota: ReplicaQuota = createMock(classOf[ReplicaQuota])
     expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes()
     quota
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 4401748..c4ca2cb 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -148,7 +148,7 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
     val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
-    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
@@ -594,7 +594,7 @@ class ReplicaManagerTest {
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
-    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
+    val mockLeaderEpochCache: LeaderEpochFileCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
       .andReturn((leaderEpochFromLeader, localLogOffset))
@@ -620,7 +620,7 @@ class ReplicaManagerTest {
     }
 
     // Expect to call LogManager.truncateTo exactly once
-    val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+    val mockLogMgr: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new File(_).getAbsoluteFile)).anyTimes
     EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig())
     EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic, topicPartition),
@@ -634,7 +634,7 @@ class ReplicaManagerTest {
     val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
     val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
 
-    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes
     aliveBrokerIds.foreach { brokerId =>
       EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes
@@ -793,7 +793,7 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
     val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
-    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     aliveBrokerIds.foreach { brokerId =>
       EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 67d083c..1bc0257 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -105,7 +105,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
   @Test
   def testBrokerStateRunningAfterZK(): Unit = {
     val brokerId = 0
-    val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])
+    val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState])
 
     class BrokerStateInterceptor() extends BrokerState {
       override def newState(newState: BrokerStates): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ecfbd73..d0ff46b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -21,7 +21,7 @@ import java.io.File
 import kafka.api._
 import kafka.utils._
 import kafka.cluster.Replica
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.metrics.Metrics
@@ -71,15 +71,15 @@ class SimpleFetchTest {
   @Before
   def setUp() {
     // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     EasyMock.replay(kafkaZkClient)
 
     // create nice mock since we don't particularly care about scheduler calls
-    val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
+    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
     EasyMock.replay(scheduler)
 
     // create the log which takes read with either HW max offset or none max offset
-    val log = EasyMock.createNiceMock(classOf[Log])
+    val log: Log = EasyMock.createNiceMock(classOf[Log])
     EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
@@ -107,7 +107,7 @@ class SimpleFetchTest {
     EasyMock.replay(log)
 
     // create the log manager that is aware of this mock log
-    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
     EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     EasyMock.replay(logManager)
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index ff781a2..c46404a 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -50,7 +50,7 @@ class ThrottledChannelExpirationTest {
 
     val request = builder.build()
     val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
-    val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+    val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
 
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
@@ -122,4 +122,4 @@ class ThrottledChannelExpirationTest {
       time.sleep(10)
     }
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 86a087b..7adc204 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -21,6 +21,7 @@ import java.util.Optional
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Replica
+import kafka.log.{Log, LogManager}
 import kafka.server._
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -46,9 +47,9 @@ class OffsetsForLeaderEpochTest {
     val request = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), epochRequested))
 
     //Stubs
-    val mockLog = createNiceMock(classOf[kafka.log.Log])
-    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
-    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    val mockLog: Log = createNiceMock(classOf[Log])
+    val mockCache: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
@@ -72,7 +73,7 @@ class OffsetsForLeaderEpochTest {
 
   @Test
   def shouldReturnNoLeaderForPartitionIfThrown(): Unit = {
-    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(logManager)
 
@@ -95,7 +96,7 @@ class OffsetsForLeaderEpochTest {
 
   @Test
   def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
-    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(logManager)
 
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 65273eb..4bf7471 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,9 +17,10 @@
 
 package kafka.utils
 
-import kafka.server.{KafkaConfig, ReplicaFetcherManager}
+import kafka.server.{KafkaConfig, ReplicaFetcherManager, ReplicaManager}
 import kafka.api.LeaderAndIsr
 import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.log.{Log, LogManager}
 import kafka.zk._
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
@@ -48,16 +49,16 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
   @Test
   def testUpdateLeaderAndIsr() {
     val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    val log: Log = EasyMock.createMock(classOf[Log])
     EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
     EasyMock.expect(log)
     EasyMock.replay(log)
 
-    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(logManager.getLog(new TopicPartition(topic, partition), false)).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
-    val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
+    val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 39745e5..aca538b 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -138,7 +138,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
     val topic = "test.topic"
 
     // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
-    val zkMock = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    val zkMock: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     EasyMock.expect(zkMock.topicExists(topic)).andReturn(false)
     EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic"))
     EasyMock.replay(zkMock)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index e11ded1..7dd3604 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -52,7 +52,7 @@ versions += [
   apacheds: "2.0.0-M24",
   argparse4j: "0.7.0",
   bcpkix: "1.60",
-  easymock: "3.6",
+  easymock: "4.0.1",
   jackson: "2.9.7",
   jetty: "9.4.12.v20180830",
   jersey: "2.27",
@@ -73,20 +73,19 @@ versions += [
   kafka_11: "1.1.1",
   kafka_20: "2.0.0",
   lz4: "1.5.0",
-  mavenArtifact: "3.5.4",
+  mavenArtifact: "3.6.0",
   metrics: "2.2.0",
   mockito: "2.23.0",
-  // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
-  powermock: "2.0.0-beta.5",
+  powermock: "2.0.0-RC.3",
   reflections: "0.9.11",
   rocksDB: "5.14.2",
   scalatest: "3.0.5",
   scoverage: "1.3.1",
   slf4j: "1.7.25",
   snappy: "1.1.7.2",
-  zkclient: "0.10",
+  zkclient: "0.11",
   zookeeper: "3.4.13",
-  zstd: "1.3.5-4"
+  zstd: "1.3.7-1"
 ]
 
 libs += [


[kafka] 02/02: KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943)

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch spotbugs-java11-2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 36fff71d7f0b5fe2cd73d3ad9ba4908eea276620
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue Nov 27 14:40:17 2018 -0800

    KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943)
    
    See https://github.com/spotbugs/spotbugs/issues/756 for details on
    the false positives affecting try with resources. An example is:
    
    > RCN | Nullcheck of fc at line 629 of value previously dereferenced in
    > org.apache.kafka.common.utils.Utils.readFileAsString(String, Charset)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 build.gradle                | 27 ++++++++++++---------------
 gradle/spotbugs-exclude.xml | 11 +++++++++++
 2 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/build.gradle b/build.gradle
index bba3d83..98b0e38 100644
--- a/build.gradle
+++ b/build.gradle
@@ -149,8 +149,7 @@ subprojects {
   apply plugin: 'maven'
   apply plugin: 'signing'
   apply plugin: 'checkstyle'
-  if (!JavaVersion.current().isJava11Compatible())
-    apply plugin: "com.github.spotbugs"
+  apply plugin: "com.github.spotbugs"
 
   sourceCompatibility = minJavaVersion
   targetCompatibility = minJavaVersion
@@ -367,20 +366,18 @@ subprojects {
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 
-  if (!JavaVersion.current().isJava11Compatible()) {
-    spotbugs {
-      toolVersion = '3.1.8'
-      excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
-      ignoreFailures = false
-    }
-    test.dependsOn('spotbugsMain')
+  spotbugs {
+    toolVersion = '3.1.8'
+    excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
+    ignoreFailures = false
+  }
+  test.dependsOn('spotbugsMain')
 
-    tasks.withType(com.github.spotbugs.SpotBugsTask) {
-      reports {
-        // Continue supporting `xmlFindBugsReport` for compatibility
-        xml.enabled(project.hasProperty('xmlSpotBugsReport') || project.hasProperty('xmlFindBugsReport'))
-        html.enabled(!project.hasProperty('xmlSpotBugsReport') && !project.hasProperty('xmlFindBugsReport'))
-      }
+  tasks.withType(com.github.spotbugs.SpotBugsTask) {
+    reports {
+      // Continue supporting `xmlFindBugsReport` for compatibility
+      xml.enabled(project.hasProperty('xmlSpotBugsReport') || project.hasProperty('xmlFindBugsReport'))
+      html.enabled(!project.hasProperty('xmlSpotBugsReport') && !project.hasProperty('xmlFindBugsReport'))
     }
   }
 
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index d83c4c4..a954baf 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -23,6 +23,17 @@ This file dictates which categories of bugs and individual false positives that
 For a detailed description of spotbugs bug categories, see https://spotbugs.readthedocs.io/en/latest/bugDescriptions.html
 -->
 <FindBugsFilter>
+
+    <!-- false positive in Java 11, see https://github.com/spotbugs/spotbugs/issues/756 -->
+    <Match>
+        <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"/>
+    </Match>
+
+    <!-- false positive in Java 11, see https://github.com/spotbugs/spotbugs/issues/756 -->
+    <Match>
+        <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"/>
+    </Match>
+
     <Match>
         <!-- Disable warnings about mutable objects and the use of public fields.
             EI_EXPOSE_REP: May expose internal representation by returning reference to mutable object