You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/11/30 15:26:52 UTC
[kafka] 01/02: MINOR: Update zstd, easymock, powermock,
zkclient and build plugins (#5846)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch spotbugs-java11-2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 347ecd896fb33d708ad31cd446dd4f9ee5672220
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat Nov 10 13:58:18 2018 -0800
MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846)
EasyMock 4.0.x includes a change that relies on the caller for inferring
the return type of mock creator methods. Updated a number of Scala
tests for compilation and execution to succeed.
The versions of EasyMock and PowerMock in this PR include full support
for Java 11.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
build.gradle | 13 +-
.../kafka/server/DelayedFetchTest.scala | 6 +-
.../kafka/common/InterBrokerSendThreadTest.scala | 2 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 2 +-
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 6 +-
.../admin/ReassignPartitionsCommandTest.scala | 12 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 10 +-
.../kafka/controller/ControllerTestUtils.scala | 2 +-
.../controller/PartitionStateMachineTest.scala | 4 +-
.../coordinator/group/GroupCoordinatorTest.scala | 6 +-
.../group/GroupMetadataManagerTest.scala | 12 +-
.../transaction/ProducerIdManagerTest.scala | 2 +-
.../TransactionCoordinatorConcurrencyTest.scala | 8 +-
.../TransactionMarkerChannelManagerTest.scala | 6 +-
...sactionMarkerRequestCompletionHandlerTest.scala | 5 +-
.../transaction/TransactionStateManagerTest.scala | 4 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 10 +-
.../unit/kafka/log/ProducerStateManagerTest.scala | 2 +-
.../kafka/server/AbstractFetcherManagerTest.scala | 2 +-
.../unit/kafka/server/ClientQuotaManagerTest.scala | 2 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 7 +-
.../kafka/server/DynamicConfigChangeTest.scala | 2 +-
.../server/HighwatermarkPersistenceTest.scala | 2 +-
.../unit/kafka/server/ISRExpirationTest.scala | 8 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 24 ++--
.../scala/unit/kafka/server/LogOffsetTest.scala | 8 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 112 ++++++++--------
.../kafka/server/ReplicaFetcherThreadTest.scala | 146 ++++++++++-----------
.../kafka/server/ReplicaManagerQuotasTest.scala | 16 +--
.../unit/kafka/server/ReplicaManagerTest.scala | 10 +-
.../unit/kafka/server/ServerStartupTest.scala | 2 +-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 10 +-
.../server/ThrottledChannelExpirationTest.scala | 4 +-
.../server/epoch/OffsetsForLeaderEpochTest.scala | 11 +-
.../unit/kafka/utils/ReplicationUtilsTest.scala | 9 +-
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 2 +-
gradle/dependencies.gradle | 11 +-
37 files changed, 251 insertions(+), 249 deletions(-)
diff --git a/build.gradle b/build.gradle
index e78d2ce..bba3d83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,11 +29,11 @@ buildscript {
// For Apache Rat plugin to ignore non-Git files
classpath "org.ajoberstar:grgit:1.9.3"
classpath 'com.github.ben-manes:gradle-versions-plugin:0.20.0'
- classpath 'org.scoverage:gradle-scoverage:2.4.0'
- classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.0'
- classpath 'org.owasp:dependency-check-gradle:3.3.2'
- classpath "com.diffplug.spotless:spotless-plugin-gradle:3.15.0"
- classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.4"
+ classpath 'org.scoverage:gradle-scoverage:2.5.0'
+ classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
+ classpath 'org.owasp:dependency-check-gradle:3.3.4'
+ classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0"
+ classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.5"
}
}
@@ -369,8 +369,7 @@ subprojects {
if (!JavaVersion.current().isJava11Compatible()) {
spotbugs {
- // 3.1.6 has a regression that breaks our build, seems to be https://github.com/spotbugs/spotbugs/pull/688
- toolVersion = '3.1.5'
+ toolVersion = '3.1.8'
excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
ignoreFailures = false
}
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 890ea3b..f7c51f7 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -30,8 +30,8 @@ import org.junit.Assert._
class DelayedFetchTest extends EasyMockSupport {
private val maxBytes = 1024
- private val replicaManager = mock(classOf[ReplicaManager])
- private val replicaQuota = mock(classOf[ReplicaQuota])
+ private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
@Test
def testFetchWithFencedEpoch(): Unit = {
@@ -58,7 +58,7 @@ class DelayedFetchTest extends EasyMockSupport {
quota = replicaQuota,
responseCallback = callback)
- val partition = mock(classOf[Partition])
+ val partition: Partition = mock(classOf[Partition])
EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
.andReturn(partition)
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index 6838653..5c0ea2d 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -30,7 +30,7 @@ import scala.collection.mutable
class InterBrokerSendThreadTest {
private val time = new MockTime()
- private val networkClient = EasyMock.createMock(classOf[NetworkClient])
+ private val networkClient: NetworkClient = EasyMock.createMock(classOf[NetworkClient])
private val completionHandler = new StubCompletionHandler
private val requestTimeoutMs = 1000
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index a1c317e..a82657e 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -142,7 +142,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
val topic = "test.topic"
// simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
- val zkMock = EasyMock.createNiceMock(classOf[ZkUtils])
+ val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false)
EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic"))
EasyMock.replay(zkMock)
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index cb261f6..ee4a6ef 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -245,12 +245,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5"))
val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
future.complete(util.Collections.singletonMap(resource, new Config(configEntries)))
- val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+ val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
EasyMock.expect(describeResult.all()).andReturn(future).once()
val alterFuture = new KafkaFutureImpl[Void]
alterFuture.complete(null)
- val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
+ val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
EasyMock.expect(alterResult.all()).andReturn(alterFuture)
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
@@ -622,7 +622,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test
def testQuotaDescribeEntities() {
- val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 213c23a..0d89430 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -268,7 +268,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
//Setup
val zk = stubZKClient(existing)
- val admin = createMock(classOf[AdminZkClient])
+ val admin: AdminZkClient = createMock(classOf[AdminZkClient])
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
expect(admin.fetchEntityConfig(anyString(), anyString())).andStubReturn(new Properties)
@@ -294,7 +294,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
//Setup
val zk = stubZKClient(existing)
- val admin = createMock(classOf[AdminZkClient])
+ val admin: AdminZkClient = createMock(classOf[AdminZkClient])
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
@@ -328,7 +328,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
//Setup
val zk = stubZKClient(existing)
- val admin = createMock(classOf[AdminZkClient])
+ val admin: AdminZkClient = createMock(classOf[AdminZkClient])
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
@@ -364,7 +364,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
//Setup
val zk = stubZKClient(brokers = brokers)
- val admin = createMock(classOf[AdminZkClient])
+ val admin: AdminZkClient = createMock(classOf[AdminZkClient])
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
expect(admin.fetchEntityConfig(is(ConfigType.Topic), anyString())).andStubReturn(new Properties)
expect(admin.changeBrokerConfig(anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes()
@@ -399,7 +399,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
//Setup
val zk = stubZKClient(brokers = Seq(100, 101))
- val admin = createMock(classOf[AdminZkClient])
+ val admin: AdminZkClient = createMock(classOf[AdminZkClient])
val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
expect(admin.fetchEntityConfig(is(ConfigType.Broker), anyString())).andStubReturn(new Properties)
expect(admin.fetchEntityConfig(is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs))
@@ -567,7 +567,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
def stubZKClient(existingAssignment: Map[TopicPartition, Seq[Int]] = Map[TopicPartition, Seq[Int]](),
brokers: Seq[Int] = Seq[Int]()): KafkaZkClient = {
- val zkClient = createMock(classOf[KafkaZkClient])
+ val zkClient: KafkaZkClient = createMock(classOf[KafkaZkClient])
expect(zkClient.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Set[String]])).andStubReturn(existingAssignment)
expect(zkClient.getAllBrokersInCluster).andStubReturn(brokers.map(TestUtils.createBroker(_, "", 1)))
replay(zkClient)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b5b271e..e05f148 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -75,7 +75,7 @@ class PartitionTest {
val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
val brokerConfig = KafkaConfig.fromProps(brokerProps)
- val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+ val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
replicaManager = new ReplicaManager(
config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time),
logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
@@ -370,8 +370,8 @@ class PartitionTest {
isLeader: Boolean,
log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
- val replicaManager = EasyMock.mock(classOf[ReplicaManager])
- val zkClient = EasyMock.mock(classOf[KafkaZkClient])
+ val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+ val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
val partition = new Partition(topicPartition,
isOffline = false,
@@ -465,8 +465,8 @@ class PartitionTest {
def testListOffsetIsolationLevels(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, logConfig)
val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
- val replicaManager = EasyMock.mock(classOf[ReplicaManager])
- val zkClient = EasyMock.mock(classOf[KafkaZkClient])
+ val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+ val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
val partition = new Partition(topicPartition,
isOffline = false,
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
index b0413a7..84b956d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -22,7 +22,7 @@ object ControllerTestUtils {
/** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
- val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
+ val mockEvent: ControllerEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
EasyMock.expect(mockEvent.state).andReturn(controllerState)
EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
def answer(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 3370b54..0e8f98e 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -433,7 +433,7 @@ class PartitionStateMachineTest extends JUnitSuite {
mockZkClient, partitionState, mockControllerBrokerRequestBatch)
def createMockController() = {
- val mockController = EasyMock.createMock(classOf[KafkaController])
+ val mockController: KafkaController = EasyMock.createMock(classOf[KafkaController])
EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes()
EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes()
EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes()
@@ -444,7 +444,7 @@ class PartitionStateMachineTest extends JUnitSuite {
}
val mockController = createMockController()
- val mockEventManager = EasyMock.createMock(classOf[ControllerEventManager])
+ val mockEventManager: ControllerEventManager = EasyMock.createMock(classOf[ControllerEventManager])
EasyMock.replay(mockController, replicaStateMachine, mockEventManager)
val topicDeletionManager = new TopicDeletionManager(mockController, mockEventManager, mockZkClient)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index c2c0841..c162342 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -928,7 +928,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Empty.toString, summary.state)
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
- val partition = EasyMock.niceMock(classOf[Partition])
+ val partition: Partition = EasyMock.niceMock(classOf[Partition])
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1425,7 +1425,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NONE, leaveGroupResult)
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
- val partition = EasyMock.niceMock(classOf[Partition])
+ val partition: Partition = EasyMock.niceMock(classOf[Partition])
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1466,7 +1466,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NONE, leaveGroupResult)
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
- val partition = EasyMock.niceMock(classOf[Partition])
+ val partition: Partition = EasyMock.niceMock(classOf[Partition])
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 9ab9705..3ab4a13 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -729,7 +729,7 @@ class GroupMetadataManagerTest {
val tp2 = new TopicPartition("bar", 0)
val tp3 = new TopicPartition("xxx", 0)
- val logMock = EasyMock.mock(classOf[Log])
+ val logMock: Log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
val segment1MemberId = "a"
@@ -1785,19 +1785,19 @@ class GroupMetadataManagerTest {
offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
// Prepend empty control batch to valid records
- val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+ val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch])
EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
EasyMock.expect(mockBatch.isTransactional).andReturn(true)
EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
EasyMock.replay(mockBatch)
- val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+ val mockRecords: MemoryRecords = EasyMock.createMock(classOf[MemoryRecords])
EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes()
EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
EasyMock.replay(mockRecords)
- val logMock = EasyMock.mock(classOf[Log])
+ val logMock: Log = EasyMock.mock(classOf[Log])
EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
maxLength = EasyMock.anyInt(),
@@ -1886,7 +1886,7 @@ class GroupMetadataManagerTest {
private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
startOffset: Long,
records: MemoryRecords): Unit = {
- val logMock = EasyMock.mock(classOf[Log])
+ val logMock: Log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
@@ -1902,7 +1902,7 @@ class GroupMetadataManagerTest {
startOffset: Long,
records: MemoryRecords): Long = {
val endOffset = startOffset + records.records.asScala.size
- val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+ val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 660e623..b2cc4a5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
class ProducerIdManagerTest {
- private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
@After
def tearDown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index b2a6733..3cf9566 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -81,13 +81,13 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
new MockTimer,
reaperEnabled = false)
val brokerNode = new Node(0, "host", 10)
- val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
+ val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
EasyMock.anyString(),
EasyMock.anyInt(),
EasyMock.anyObject())
).andReturn(Some(brokerNode)).anyTimes()
- val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
+ val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
txnMarkerChannelManager = new TransactionMarkerChannelManager(
KafkaConfig.fromProps(serverProps),
metadataCache,
@@ -246,8 +246,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
private def prepareTxnLog(partitionId: Int): Unit = {
- val logMock = EasyMock.mock(classOf[Log])
- val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+ val logMock: Log = EasyMock.mock(classOf[Log])
+ val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 454b361..44d5c5f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
class TransactionMarkerChannelManagerTest {
- private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
- private val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
- private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+ private val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
+ private val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
+ private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
private val partition1 = new TopicPartition("topic1", 0)
private val partition2 = new TopicPartition("topic1", 1)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 3ca6c1b..85159c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -47,9 +47,10 @@ class TransactionMarkerRequestCompletionHandlerTest {
private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerEpoch, txnTimeoutMs,
PrepareCommit, mutable.Set[TopicPartition](topicPartition), 0L, 0L)
- private val markerChannelManager = EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
+ private val markerChannelManager: TransactionMarkerChannelManager =
+ EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
- private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+ private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
private val handler = new TransactionMarkerRequestCompletionHandler(brokerId, txnStateManager, markerChannelManager, txnIdAndMarkers)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index d2fe7ea..ba8fe1d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -574,8 +574,8 @@ class TransactionStateManagerTest {
records: MemoryRecords): Unit = {
EasyMock.reset(replicaManager)
- val logMock = EasyMock.mock(classOf[Log])
- val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+ val logMock: Log = EasyMock.mock(classOf[Log])
+ val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
val endOffset = startOffset + records.records.asScala.size
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7728998..9b39567 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -325,7 +325,7 @@ class LogTest {
@Test
def testSizeForLargeLogs(): Unit = {
val largeSize = Int.MaxValue.toLong * 2
- val logSegment = EasyMock.createMock(classOf[LogSegment])
+ val logSegment: LogSegment = EasyMock.createMock(classOf[LogSegment])
EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes
EasyMock.replay(logSegment)
@@ -347,7 +347,7 @@ class LogTest {
@Test
def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = {
- val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+ val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
// Load the log
EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
@@ -426,7 +426,7 @@ class LogTest {
@Test
def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
- val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+ val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
stateManager.updateMapEndOffset(0L)
EasyMock.expectLastCall().anyTimes()
@@ -463,7 +463,7 @@ class LogTest {
@Test
def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
- val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+ val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
stateManager.updateMapEndOffset(0L)
EasyMock.expectLastCall().anyTimes()
@@ -503,7 +503,7 @@ class LogTest {
@Test
def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = {
- val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+ val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9afb145..b49b5e1 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -754,7 +754,7 @@ class ProducerStateManagerTest extends JUnitSuite {
val producerEpoch = 145.toShort
val baseOffset = 15
- val batch = EasyMock.createMock(classOf[RecordBatch])
+ val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
EasyMock.expect(batch.isControlBatch).andReturn(true).once
EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
EasyMock.replay(batch)
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index cd00ff1..0a4d7c1 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -26,7 +26,7 @@ class AbstractFetcherManagerTest {
@Test
def testAddAndRemovePartition(): Unit = {
- val fetcher = EasyMock.mock(classOf[AbstractFetcherThread])
+ val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread])
val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
fetcher
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 6f75174..e10d4b2 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -59,7 +59,7 @@ class ClientQuotaManagerTest {
val request = builder.build()
val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
- val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+ val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 41b9055..45ef18f 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -21,6 +21,7 @@ import java.util
import java.util.Properties
import kafka.utils.TestUtils
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
@@ -303,7 +304,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
def testDynamicListenerConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val oldConfig = KafkaConfig.fromProps(props)
- val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+ val kafkaServer: KafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
EasyMock.replay(kafkaServer)
@@ -328,7 +329,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
@Test
def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
- val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+ val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
EasyMock.replay(zkClient)
@@ -362,4 +363,4 @@ class TestDynamicThreadPool() extends BrokerReconfigurable {
assertEquals(10, newConfig.numIoThreads)
assertEquals(100, newConfig.backgroundThreads)
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cabe0a9..789dbae 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -210,7 +210,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
// Create a mock ConfigHandler to record config changes it is asked to process
val entityArgument = EasyMock.newCapture[String]
val propertiesArgument = EasyMock.newCapture[Properties]
- val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
+ val handler: ConfigHandler = EasyMock.createNiceMock(classOf[ConfigHandler])
handler.processConfigChanges(
EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 7451234..61cbd2c 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -35,7 +35,7 @@ class HighwatermarkPersistenceTest {
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo"
- val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
+ val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
val logManagers = configs map { config =>
TestUtils.createLogManager(
logDirs = config.logDirs.map(new File(_)),
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 3dff709..d5bdf14 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.{Partition, Replica}
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils._
import org.apache.kafka.common.TopicPartition
@@ -55,7 +55,7 @@ class IsrExpirationTest {
@Before
def setUp() {
- val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+ val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
EasyMock.replay(logManager)
@@ -252,8 +252,8 @@ class IsrExpirationTest {
}
private def logMock: Log = {
- val log = EasyMock.createMock(classOf[kafka.log.Log])
- val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
+ val log: Log = EasyMock.createMock(classOf[Log])
+ val cache: LeaderEpochFileCache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
EasyMock.expect(log.onHighWatermarkIncremented(0L))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b903c4a..be30f8a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -55,24 +55,24 @@ import scala.collection.Map
class KafkaApisTest {
- private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
- private val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
- private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
- private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
- private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
- private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
- private val controller = EasyMock.createNiceMock(classOf[KafkaController])
- private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+ private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+ private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
+ private val groupCoordinator: GroupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
+ private val adminManager: AdminManager = EasyMock.createNiceMock(classOf[AdminManager])
+ private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
+ private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController])
+ private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
private val metrics = new Metrics()
private val brokerId = 1
private val metadataCache = new MetadataCache(brokerId)
private val authorizer: Option[Authorizer] = None
- private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
- private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
- private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+ private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
+ private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
+ private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
- private val fetchManager = EasyMock.createNiceMock(classOf[FetchManager])
+ private val fetchManager: FetchManager = EasyMock.createNiceMock(classOf[FetchManager])
private val brokerTopicStats = new BrokerTopicStats
private val clusterId = "clusterId"
private val time = new MockTime
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 740b28e..50449dc 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -211,8 +211,8 @@ class LogOffsetTest extends BaseRequestTest {
* a race condition) */
@Test
def testFetchOffsetsBeforeWithChangingSegmentSize() {
- val log = EasyMock.niceMock(classOf[Log])
- val logSegment = EasyMock.niceMock(classOf[LogSegment])
+ val log: Log = EasyMock.niceMock(classOf[Log])
+ val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] {
private val value = new AtomicInteger(0)
def answer: Int = value.getAndIncrement()
@@ -228,8 +228,8 @@ class LogOffsetTest extends BaseRequestTest {
* different (simulating a race condition) */
@Test
def testFetchOffsetsBeforeWithChangingSegments() {
- val log = EasyMock.niceMock(classOf[Log])
- val logSegment = EasyMock.niceMock(classOf[LogSegment])
+ val log: Log = EasyMock.niceMock(classOf[Log])
+ val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
EasyMock.expect(log.logSegments).andStubAnswer {
new IAnswer[Iterable[LogSegment]] {
def answer = new Iterable[LogSegment] {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 5d92b61..a4fbaf2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -50,9 +50,9 @@ class ReplicaAlterLogDirsThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies
- val partitionT1p0 = createMock(classOf[Partition])
- val partitionT1p1 = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val partitionT1p0: Partition = createMock(classOf[Partition])
+ val partitionT1p1: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val leaderEpochT1p0 = 2
val leaderEpochT1p1 = 5
@@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies
- val partitionT1p0 = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val partitionT1p0: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val leaderEpoch = 2
val leo = 13
@@ -149,18 +149,18 @@ class ReplicaAlterLogDirsThreadTest {
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
- val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
- val futureReplicaLeaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache])
- val futureReplicaLeaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaT1p0 = createNiceMock(classOf[Replica])
- val replicaT1p1 = createNiceMock(classOf[Replica])
+ val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val futureReplicaLeaderEpochsT1p0: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val futureReplicaLeaderEpochsT1p1: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaT1p0: Replica = createNiceMock(classOf[Replica])
+ val replicaT1p1: Replica = createNiceMock(classOf[Replica])
// one future replica mock because our mocking methods return same values for both future replicas
- val futureReplicaT1p0 = createNiceMock(classOf[Replica])
- val futureReplicaT1p1 = createNiceMock(classOf[Replica])
- val partitionT1p0 = createMock(classOf[Partition])
- val partitionT1p1 = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val futureReplicaT1p0: Replica = createNiceMock(classOf[Replica])
+ val futureReplicaT1p1: Replica = createNiceMock(classOf[Replica])
+ val partitionT1p0: Partition = createMock(classOf[Partition])
+ val partitionT1p1: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
val leaderEpoch = 2
@@ -228,14 +228,14 @@ class ReplicaAlterLogDirsThreadTest {
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
- val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
- val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replica = createNiceMock(classOf[Replica])
+ val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
// one future replica mock because our mocking methods return same values for both future replicas
- val futureReplica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val futureReplica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
val leaderEpoch = 5
@@ -301,13 +301,13 @@ class ReplicaAlterLogDirsThreadTest {
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
- val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
- val logManager = createMock(classOf[LogManager])
- val replica = createNiceMock(classOf[Replica])
- val futureReplica = createNiceMock(classOf[Replica])
- val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val futureReplica: Replica = createNiceMock(classOf[Replica])
+ val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
val initialFetchOffset = 100
@@ -356,13 +356,13 @@ class ReplicaAlterLogDirsThreadTest {
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
- val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
- val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[kafka.log.LogManager])
- val replica = createNiceMock(classOf[Replica])
- val futureReplica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+ val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val futureReplica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
val futureReplicaLeaderEpoch = 1
@@ -439,13 +439,13 @@ class ReplicaAlterLogDirsThreadTest {
//Setup all dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
- val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
- val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replica = createNiceMock(classOf[Replica])
- val futureReplica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val futureReplica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
val leaderEpoch = 5
@@ -494,12 +494,12 @@ class ReplicaAlterLogDirsThreadTest {
//Setup all dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
- val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
- val logManager = createMock(classOf[LogManager])
- val replica = createNiceMock(classOf[Replica])
- val futureReplica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val futureReplica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
//Stubs
expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
@@ -543,12 +543,12 @@ class ReplicaAlterLogDirsThreadTest {
//Setup all dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
- val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
- val logManager = createMock(classOf[LogManager])
- val replica = createNiceMock(classOf[Replica])
- val futureReplica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val futureReplica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
//Stubs
expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index a370101..c65c254 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -79,7 +79,7 @@ class ReplicaFetcherThreadTest {
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
val config = KafkaConfig.fromProps(props)
- val leaderEndpoint = createMock(classOf[BlockingSend])
+ val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
override def answer(): ClientResponse = {
toFail = true // assert no leader request is sent
@@ -123,7 +123,7 @@ class ReplicaFetcherThreadTest {
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
val config = KafkaConfig.fromProps(props)
- val leaderEndpoint = createMock(classOf[BlockingSend])
+ val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
override def answer(): ClientResponse = {
@@ -162,13 +162,13 @@ class ReplicaFetcherThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val leaderEpoch = 5
@@ -261,7 +261,7 @@ class ReplicaFetcherThreadTest {
def shouldHandleExceptionFromBlockingSend(): Unit = {
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
val config = KafkaConfig.fromProps(props)
- val mockBlockingSend = createMock(classOf[BlockingSend])
+ val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once()
replay(mockBlockingSend)
@@ -295,12 +295,12 @@ class ReplicaFetcherThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val leaderEpoch = 5
@@ -355,13 +355,13 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val leaderEpoch = 5
val initialLEO = 200
@@ -404,13 +404,13 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val leaderEpochAtFollower = 5
val leaderEpochAtLeader = 4
@@ -458,13 +458,13 @@ class ReplicaFetcherThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
// Setup all dependencies
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val initialLEO = 200
@@ -529,13 +529,13 @@ class ReplicaFetcherThreadTest {
val config = KafkaConfig.fromProps(props)
// Setup all dependencies
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val initialLEO = 200
@@ -590,13 +590,13 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val initialFetchOffset = 100
val initialLeo = 300
@@ -635,13 +635,13 @@ class ReplicaFetcherThreadTest {
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
- val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createMock(classOf[kafka.log.LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
val leaderEpoch = 5
val highWaterMark = 100
@@ -693,13 +693,13 @@ class ReplicaFetcherThreadTest {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all stubs
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createNiceMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createNiceMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createNiceMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
val leaderEpoch = 4
@@ -746,13 +746,13 @@ class ReplicaFetcherThreadTest {
val initialLEO = 100
//Setup all stubs
- val quota = createNiceMock(classOf[ReplicationQuotaManager])
- val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
- val logManager = createNiceMock(classOf[LogManager])
- val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
- val replica = createNiceMock(classOf[Replica])
- val partition = createMock(classOf[Partition])
- val replicaManager = createNiceMock(classOf[ReplicaManager])
+ val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+ val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createNiceMock(classOf[LogManager])
+ val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+ val replica: Replica = createNiceMock(classOf[Replica])
+ val partition: Partition = createMock(classOf[Partition])
+ val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
//Stub return values
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
@@ -796,7 +796,7 @@ class ReplicaFetcherThreadTest {
def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit = {
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
val config = KafkaConfig.fromProps(props)
- val mockBlockingSend = createMock(classOf[BlockingSend])
+ val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
expect(mockBlockingSend.initiateClose()).andThrow(new IllegalArgumentException()).once()
expect(mockBlockingSend.close()).andThrow(new IllegalStateException()).once()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9b59e71..50b5fa7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -21,7 +21,7 @@ import java.util.{Optional, Properties}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.{Partition, Replica}
-import kafka.log.{Log, LogOffsetSnapshot}
+import kafka.log.{Log, LogManager, LogOffsetSnapshot}
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
@@ -151,7 +151,7 @@ class ReplicaManagerQuotasTest {
// Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
val endOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
- val partition = EasyMock.createMock(classOf[Partition])
+ val partition: Partition = EasyMock.createMock(classOf[Partition])
val offsetSnapshot = LogOffsetSnapshot(
logStartOffset = 0L,
@@ -161,7 +161,7 @@ class ReplicaManagerQuotasTest {
EasyMock.expect(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true))
.andReturn(offsetSnapshot)
- val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+ val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
EasyMock.expect(replicaManager.getPartitionOrException(
EasyMock.anyObject[TopicPartition], EasyMock.anyBoolean()))
.andReturn(partition).anyTimes()
@@ -192,11 +192,11 @@ class ReplicaManagerQuotasTest {
}
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
- val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
- val scheduler = createNiceMock(classOf[KafkaScheduler])
+ val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+ val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
//Create log which handles both a regular read and a 0 bytes read
- val log = createNiceMock(classOf[Log])
+ val log: Log = createNiceMock(classOf[Log])
expect(log.logStartOffset).andReturn(0L).anyTimes()
expect(log.logEndOffset).andReturn(20L).anyTimes()
expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
@@ -225,7 +225,7 @@ class ReplicaManagerQuotasTest {
replay(log)
//Create log manager
- val logManager = createMock(classOf[kafka.log.LogManager])
+ val logManager: LogManager = createMock(classOf[LogManager])
//Return the same log for each partition as it doesn't matter
expect(logManager.getLog(anyObject(), anyBoolean())).andReturn(Some(log)).anyTimes()
@@ -263,7 +263,7 @@ class ReplicaManagerQuotasTest {
}
def mockQuota(bound: Long): ReplicaQuota = {
- val quota = createMock(classOf[ReplicaQuota])
+ val quota: ReplicaQuota = createMock(classOf[ReplicaQuota])
expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes()
quota
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 4401748..c4ca2cb 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -148,7 +148,7 @@ class ReplicaManagerTest {
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
- val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+ val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
@@ -594,7 +594,7 @@ class ReplicaManagerTest {
val mockScheduler = new MockScheduler(time)
val mockBrokerTopicStats = new BrokerTopicStats
val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
- val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
+ val mockLeaderEpochCache: LeaderEpochFileCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
.andReturn((leaderEpochFromLeader, localLogOffset))
@@ -620,7 +620,7 @@ class ReplicaManagerTest {
}
// Expect to call LogManager.truncateTo exactly once
- val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+ val mockLogMgr: LogManager = EasyMock.createMock(classOf[LogManager])
EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new File(_).getAbsoluteFile)).anyTimes
EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig())
EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic, topicPartition),
@@ -634,7 +634,7 @@ class ReplicaManagerTest {
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
- val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+ val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes
aliveBrokerIds.foreach { brokerId =>
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes
@@ -793,7 +793,7 @@ class ReplicaManagerTest {
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
- val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+ val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
aliveBrokerIds.foreach { brokerId =>
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 67d083c..1bc0257 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -105,7 +105,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
@Test
def testBrokerStateRunningAfterZK(): Unit = {
val brokerId = 0
- val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])
+ val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState])
class BrokerStateInterceptor() extends BrokerState {
override def newState(newState: BrokerStates): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ecfbd73..d0ff46b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -21,7 +21,7 @@ import java.io.File
import kafka.api._
import kafka.utils._
import kafka.cluster.Replica
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.metrics.Metrics
@@ -71,15 +71,15 @@ class SimpleFetchTest {
@Before
def setUp() {
// create nice mock since we don't particularly care about zkclient calls
- val kafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
EasyMock.replay(kafkaZkClient)
// create nice mock since we don't particularly care about scheduler calls
- val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
+ val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
EasyMock.replay(scheduler)
// create the log which takes read with either HW max offset or none max offset
- val log = EasyMock.createNiceMock(classOf[Log])
+ val log: Log = EasyMock.createNiceMock(classOf[Log])
EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
@@ -107,7 +107,7 @@ class SimpleFetchTest {
EasyMock.replay(log)
// create the log manager that is aware of this mock log
- val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+ val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
EasyMock.replay(logManager)
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index ff781a2..c46404a 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -50,7 +50,7 @@ class ThrottledChannelExpirationTest {
val request = builder.build()
val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
- val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+ val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
@@ -122,4 +122,4 @@ class ThrottledChannelExpirationTest {
time.sleep(10)
}
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 86a087b..7adc204 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -21,6 +21,7 @@ import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Replica
+import kafka.log.{Log, LogManager}
import kafka.server._
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
@@ -46,9 +47,9 @@ class OffsetsForLeaderEpochTest {
val request = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), epochRequested))
//Stubs
- val mockLog = createNiceMock(classOf[kafka.log.Log])
- val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
- val logManager = createNiceMock(classOf[kafka.log.LogManager])
+ val mockLog: Log = createNiceMock(classOf[Log])
+ val mockCache: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+ val logManager: LogManager = createNiceMock(classOf[LogManager])
expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
@@ -72,7 +73,7 @@ class OffsetsForLeaderEpochTest {
@Test
def shouldReturnNoLeaderForPartitionIfThrown(): Unit = {
- val logManager = createNiceMock(classOf[kafka.log.LogManager])
+ val logManager: LogManager = createNiceMock(classOf[LogManager])
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
replay(logManager)
@@ -95,7 +96,7 @@ class OffsetsForLeaderEpochTest {
@Test
def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
- val logManager = createNiceMock(classOf[kafka.log.LogManager])
+ val logManager: LogManager = createNiceMock(classOf[LogManager])
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
replay(logManager)
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 65273eb..4bf7471 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,9 +17,10 @@
package kafka.utils
-import kafka.server.{KafkaConfig, ReplicaFetcherManager}
+import kafka.server.{KafkaConfig, ReplicaFetcherManager, ReplicaManager}
import kafka.api.LeaderAndIsr
import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.log.{Log, LogManager}
import kafka.zk._
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
@@ -48,16 +49,16 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
@Test
def testUpdateLeaderAndIsr() {
val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
- val log = EasyMock.createMock(classOf[kafka.log.Log])
+ val log: Log = EasyMock.createMock(classOf[Log])
EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
EasyMock.expect(log)
EasyMock.replay(log)
- val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+ val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
EasyMock.expect(logManager.getLog(new TopicPartition(topic, partition), false)).andReturn(Some(log)).anyTimes()
EasyMock.replay(logManager)
- val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
+ val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 39745e5..aca538b 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -138,7 +138,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
val topic = "test.topic"
// simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
- val zkMock = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ val zkMock: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
EasyMock.expect(zkMock.topicExists(topic)).andReturn(false)
EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic"))
EasyMock.replay(zkMock)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index e11ded1..7dd3604 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -52,7 +52,7 @@ versions += [
apacheds: "2.0.0-M24",
argparse4j: "0.7.0",
bcpkix: "1.60",
- easymock: "3.6",
+ easymock: "4.0.1",
jackson: "2.9.7",
jetty: "9.4.12.v20180830",
jersey: "2.27",
@@ -73,20 +73,19 @@ versions += [
kafka_11: "1.1.1",
kafka_20: "2.0.0",
lz4: "1.5.0",
- mavenArtifact: "3.5.4",
+ mavenArtifact: "3.6.0",
metrics: "2.2.0",
mockito: "2.23.0",
- // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
- powermock: "2.0.0-beta.5",
+ powermock: "2.0.0-RC.3",
reflections: "0.9.11",
rocksDB: "5.14.2",
scalatest: "3.0.5",
scoverage: "1.3.1",
slf4j: "1.7.25",
snappy: "1.1.7.2",
- zkclient: "0.10",
+ zkclient: "0.11",
zookeeper: "3.4.13",
- zstd: "1.3.5-4"
+ zstd: "1.3.7-1"
]
libs += [