You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/07/14 18:46:35 UTC
[1/2] kafka git commit: MINOR: Enable a number of xlint scalac
warnings
Repository: kafka
Updated Branches:
refs/heads/trunk e39104547 -> 1685e7112
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 6d9ab72..d2dd1e1 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -37,7 +37,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerAdvertiseHostNameAndPortToZK: Unit = {
+ def testBrokerAdvertiseHostNameAndPortToZK(): Unit = {
val advertisedHostName = "routable-host1"
val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
@@ -54,7 +54,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName.value)
}
- def testBrokerAdvertiseListenersToZK: Unit = {
+ def testBrokerAdvertiseListenersToZK(): Unit = {
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
props.put("advertised.listeners", "PLAINTEXT://routable-listener:3334")
servers += TestUtils.createServer(KafkaConfig.fromProps(props))
@@ -68,7 +68,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName)
}
- def testBrokerAdvertiseListenersWithCustomNamesToZK: Unit = {
+ def testBrokerAdvertiseListenersWithCustomNamesToZK(): Unit = {
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
props.put("listeners", "INTERNAL://:0,EXTERNAL://:0")
props.put("advertised.listeners", "EXTERNAL://external-listener:9999,INTERNAL://internal-listener:10999")
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
index 2555a91..262686a 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -25,7 +25,7 @@ import org.junit.Test
class ApiVersionsTest {
@Test
- def testApiVersions {
+ def testApiVersions(): Unit = {
val apiVersions = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions
assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index a7a12eb..b212a74 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -39,7 +39,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
// If required, override properties by mutating the passed Properties object
protected def propertyOverrides(properties: Properties) {}
- def generateConfigs() = {
+ def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = true,
interBrokerSecurityProtocol = Some(securityProtocol),
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index dc30fb2..9d2bb8b 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,14 +26,13 @@ import org.easymock.EasyMock
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
-import kafka.common._
import kafka.admin.{AdminOperationException, AdminUtils}
import org.apache.kafka.common.TopicPartition
import scala.collection.Map
class DynamicConfigChangeTest extends KafkaServerTestHarness {
- def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+ def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testConfigChange() {
@@ -174,7 +173,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
- def testProcessNotification {
+ def testProcessNotification(): Unit = {
val props = new Properties()
props.put("a.b", "10")
@@ -230,7 +229,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
- def shouldParseReplicationQuotaProperties {
+ def shouldParseReplicationQuotaProperties(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
val props: Properties = new Properties()
@@ -243,7 +242,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
- def shouldParseWildcardReplicationQuotaProperties {
+ def shouldParseWildcardReplicationQuotaProperties(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
val props: Properties = new Properties()
@@ -258,7 +257,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
- def shouldParseReplicationQuotaReset {
+ def shouldParseReplicationQuotaReset(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
val props: Properties = new Properties()
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 0deb26d..a7817ad 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -37,7 +37,7 @@ import scala.collection.JavaConverters._
class EdgeCaseRequestTest extends KafkaServerTestHarness {
- def generateConfigs() = {
+ def generateConfigs = {
val props = TestUtils.createBrokerConfig(1, zkConnect)
props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
List(KafkaConfig.fromProps(props))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index df8a6d7..1b801de 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -121,7 +121,7 @@ class KafkaConfigTest {
}
@Test
- def testLogRetentionValid {
+ def testLogRetentionValid(): Unit = {
val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 4ebb17b..3497cc3 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -65,7 +65,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
- def testLeaderElectionAndEpoch {
+ def testLeaderElectionAndEpoch(): Unit = {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index a5f0dba..e1d4d75 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -99,7 +99,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
- def testHWCheckpointNoFailuresSingleLogSegment {
+ def testHWCheckpointNoFailuresSingleLogSegment(): Unit = {
val numMessages = 2L
sendMessages(numMessages.toInt)
@@ -116,7 +116,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
- def testHWCheckpointWithFailuresSingleLogSegment {
+ def testHWCheckpointWithFailuresSingleLogSegment(): Unit = {
var leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
assertEquals(0L, hwFile1.read.getOrElse(topicPartition, 0L))
@@ -167,7 +167,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
- def testHWCheckpointNoFailuresMultipleLogSegments {
+ def testHWCheckpointNoFailuresMultipleLogSegments(): Unit = {
sendMessages(20)
val hw = 20L
// give some time for follower 1 to record leader HW of 600
@@ -183,7 +183,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
- def testHWCheckpointWithFailuresMultipleLogSegments {
+ def testHWCheckpointWithFailuresMultipleLogSegments(): Unit = {
var leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
sendMessages(2)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index a25569f..c764369 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -35,7 +35,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerCreatesZKChroot {
+ def testBrokerCreatesZKChroot(): Unit = {
val brokerId = 0
val zookeeperChroot = "/kafka-chroot-for-unittest"
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
@@ -48,7 +48,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
- def testConflictBrokerStartupWithSamePort {
+ def testConflictBrokerStartupWithSamePort(): Unit = {
// Create and start first broker
val brokerId1 = 0
val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect)
@@ -67,7 +67,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
- def testConflictBrokerRegistration {
+ def testConflictBrokerRegistration(): Unit = {
// Try starting a broker with the a conflicting broker id.
// This shouldn't affect the existing broker registration.
@@ -90,7 +90,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerSelfAware {
+ def testBrokerSelfAware(): Unit = {
val brokerId = 0
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
server = TestUtils.createServer(KafkaConfig.fromProps(props))
@@ -101,7 +101,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerStateRunningAfterZK {
+ def testBrokerStateRunningAfterZK(): Unit = {
val brokerId = 0
val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 5e069a1..ecc13d9 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -263,7 +263,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
/**
* Simulates how the Replica Fetcher Thread requests leader offsets for epochs
*/
- private class TestFetcherThread(sender: BlockingSend) extends Logging {
+ private[epoch] class TestFetcherThread(sender: BlockingSend) extends Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
val request = new OffsetsForLeaderEpochRequest.Builder(toJavaFormat(partitions))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 14f0114..4976f52 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -448,7 +448,7 @@ object TestUtils extends Logging {
var cur: Iterator[T] = null
val topIterator = s.iterator
- def hasNext() : Boolean = {
+ def hasNext: Boolean = {
while (true) {
if (cur == null) {
if (topIterator.hasNext)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 0101735..6280d97 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -72,7 +72,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
}
@Test
- def testEphemeralNodeCleanup = {
+ def testEphemeralNodeCleanup(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled())
@@ -100,7 +100,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
* Tests basic creation
*/
@Test
- def testZkWatchedEphemeral = {
+ def testZkWatchedEphemeral(): Unit = {
testCreation("/zwe-test")
testCreation("/zwe-test-parent/zwe-test")
}
@@ -128,7 +128,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
* session.
*/
@Test
- def testOverlappingSessions = {
+ def testOverlappingSessions(): Unit = {
val path = "/zwe-test"
val zk1 = zkUtils.zkConnection.getZookeeper
@@ -156,7 +156,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
* Tests if succeeds with znode from the same session
*/
@Test
- def testSameSession = {
+ def testSameSession(): Unit = {
val path = "/zwe-test"
val zk = zkUtils.zkConnection.getZookeeper
// Creates znode for path in the first session
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 04f8aaf..07978b9 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -30,7 +30,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
@Test
- def testCreatePersistentPathThrowsException {
+ def testCreatePersistentPathThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
@@ -46,7 +46,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
- def testCreatePersistentPath {
+ def testCreatePersistentPath(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState
@@ -56,7 +56,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
- def testMakeSurePersistsPathExistsThrowsException {
+ def testMakeSurePersistsPathExistsThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
@@ -70,7 +70,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
- def testMakeSurePersistsPathExists {
+ def testMakeSurePersistsPathExists(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState
@@ -80,7 +80,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateEphemeralPathThrowsException {
+ def testCreateEphemeralPathThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
@@ -94,7 +94,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateEphemeralPathExists {
+ def testCreateEphemeralPathExists(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState
@@ -104,7 +104,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
- def testCreatePersistentSequentialThrowsException {
+ def testCreatePersistentSequentialThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
@@ -119,7 +119,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
- def testCreatePersistentSequentialExists {
+ def testCreatePersistentSequentialExists(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState
[2/2] kafka git commit: MINOR: Enable a number of xlint scalac
warnings
Posted by jg...@apache.org.
MINOR: Enable a number of xlint scalac warnings
Update the code where possible to fix the warnings. The unused
warning introduced in Scala 2.12 is quite handy and provides
a reason to compile with Scala 2.12.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3464 from ijuma/scala-xlint
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1685e711
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1685e711
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1685e711
Branch: refs/heads/trunk
Commit: 1685e7112c5d4dc723ffcfa219febaed045b6426
Parents: e391045
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Jul 14 11:44:42 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jul 14 11:44:42 2017 -0700
----------------------------------------------------------------------
build.gradle | 22 ++++++++++++++-
.../kafka/admin/BrokerApiVersionsCommand.scala | 3 +--
.../kafka/admin/DeleteRecordsCommand.scala | 2 --
.../main/scala/kafka/admin/TopicCommand.scala | 2 +-
.../scala/kafka/admin/ZkSecurityMigrator.scala | 2 --
.../kafka/api/ControlledShutdownResponse.scala | 2 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 1 -
.../scala/kafka/api/TopicMetadataRequest.scala | 2 +-
core/src/main/scala/kafka/cluster/Replica.scala | 2 --
.../scala/kafka/common/TopicAndPartition.scala | 1 -
.../kafka/consumer/ConsumerConnector.scala | 2 +-
.../kafka/consumer/ConsumerFetcherManager.scala | 2 --
.../scala/kafka/consumer/ConsumerIterator.scala | 2 +-
.../main/scala/kafka/consumer/KafkaStream.scala | 2 +-
.../kafka/consumer/PartitionAssignor.scala | 1 -
.../controller/ControllerChannelManager.scala | 20 ++------------
.../kafka/controller/KafkaController.scala | 2 +-
...nsactionMarkerRequestCompletionHandler.scala | 10 +++----
.../kafka/javaapi/TopicMetadataRequest.scala | 2 +-
.../scala/kafka/javaapi/producer/Producer.scala | 2 +-
.../main/scala/kafka/log/AbstractIndex.scala | 6 ++---
.../scala/kafka/log/LogCleanerManager.scala | 2 +-
core/src/main/scala/kafka/log/OffsetIndex.scala | 5 ++--
core/src/main/scala/kafka/log/TimeIndex.scala | 9 +++----
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 4 +--
.../main/scala/kafka/producer/Producer.scala | 2 +-
.../kafka/producer/async/EventHandler.scala | 2 +-
.../producer/async/ProducerSendThread.scala | 2 +-
.../main/scala/kafka/security/auth/Acl.scala | 21 +++++++--------
.../scala/kafka/server/ClientQuotaManager.scala | 2 +-
.../scala/kafka/utils/IteratorTemplate.scala | 4 +--
.../main/scala/kafka/utils/KafkaScheduler.scala | 4 +--
.../admin/BrokerApiVersionsCommandTest.scala | 2 +-
.../kafka/api/AdminClientIntegrationTest.scala | 2 +-
.../kafka/api/ConsumerBounceTest.scala | 2 +-
.../kafka/api/EndToEndAuthorizationTest.scala | 28 +++++++++-----------
.../kafka/api/EndToEndClusterIdTest.scala | 2 +-
.../kafka/api/ProducerBounceTest.scala | 2 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 2 +-
.../api/RackAwareAutoTopicCreationTest.scala | 2 +-
.../api/SaslEndToEndAuthorizationTest.scala | 2 +-
.../kafka/api/TransactionsBounceTest.scala | 13 +++++----
.../tools/MirrorMakerIntegrationTest.scala | 4 +--
.../unit/kafka/admin/AddPartitionsTest.scala | 12 ++++-----
.../kafka/admin/DeleteConsumerGroupTest.scala | 2 +-
.../kafka/admin/DescribeConsumerGroupTest.scala | 3 ++-
.../kafka/admin/ListConsumerGroupTest.scala | 3 ++-
.../admin/ReassignPartitionsClusterTest.scala | 4 +--
.../admin/ReassignPartitionsCommandTest.scala | 2 +-
.../unit/kafka/cluster/BrokerEndPointTest.scala | 2 +-
.../ZkNodeChangeNotificationListenerTest.scala | 4 +--
.../kafka/consumer/ConsumerIteratorTest.scala | 2 +-
.../ZookeeperConsumerConnectorTest.scala | 4 +--
.../controller/ControllerFailoverTest.scala | 2 +-
.../kafka/integration/AutoOffsetResetTest.scala | 2 +-
.../unit/kafka/integration/FetcherTest.scala | 2 +-
...MetricsDuringTopicCreationDeletionTest.scala | 2 +-
.../kafka/integration/MinIsrConfigTest.scala | 2 +-
.../kafka/integration/PrimitiveApiTest.scala | 2 +-
.../kafka/integration/TopicMetadataTest.scala | 22 +++++++--------
.../integration/UncleanLeaderElectionTest.scala | 14 +++++-----
.../ZookeeperConsumerConnectorTest.scala | 3 ++-
.../message/BaseMessageSetTestCases.scala | 2 +-
.../scala/unit/kafka/log/TimeIndexTest.scala | 2 +-
.../unit/kafka/log/TransactionIndexTest.scala | 4 +--
.../scala/unit/kafka/message/MessageTest.scala | 2 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 2 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 2 +-
.../unit/kafka/producer/SyncProducerTest.scala | 2 +-
.../security/auth/ZkAuthorizationTest.scala | 2 +-
.../unit/kafka/server/AdvertiseBrokerTest.scala | 6 ++---
.../unit/kafka/server/ApiVersionsTest.scala | 2 +-
.../unit/kafka/server/BaseRequestTest.scala | 2 +-
.../kafka/server/DynamicConfigChangeTest.scala | 11 ++++----
.../unit/kafka/server/EdgeCaseRequestTest.scala | 2 +-
.../unit/kafka/server/KafkaConfigTest.scala | 2 +-
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../unit/kafka/server/LogRecoveryTest.scala | 8 +++---
.../unit/kafka/server/ServerStartupTest.scala | 10 +++----
.../epoch/LeaderEpochIntegrationTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 8 +++---
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 16 +++++------
83 files changed, 186 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a493493..2921372 100644
--- a/build.gradle
+++ b/build.gradle
@@ -309,9 +309,29 @@ subprojects {
"-feature",
"-language:postfixOps",
"-language:implicitConversions",
- "-language:existentials"
+ "-language:existentials",
+ "-Xlint:by-name-right-associative",
+ "-Xlint:delayedinit-select",
+ "-Xlint:doc-detached",
+ "-Xlint:missing-interpolator",
+ "-Xlint:nullary-override",
+ "-Xlint:nullary-unit",
+ "-Xlint:option-implicit",
+ "-Xlint:package-object-classes",
+ "-Xlint:poly-implicit-overload",
+ "-Xlint:private-shadow",
+ "-Xlint:stars-align",
+ "-Xlint:type-parameter-shadow",
+ "-Xlint:unsound-match",
]
+ if (versions.baseScala != '2.11') {
+ scalaCompileOptions.additionalParameters += [
+ "-Xlint:constant",
+ "-Xlint:unused"
+ ]
+ }
+
configure(scalaCompileOptions.forkOptions) {
memoryMaximumSize = '1g'
jvmArgs = ['-Xss2m'] + maxPermSizeArgs
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 4aea3c0..b25a8da 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -24,7 +24,6 @@ import kafka.utils.CommandLineUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.CommonClientConfigs
import joptsimple._
-import org.apache.kafka.common.Node
import scala.util.{Failure, Success}
@@ -41,7 +40,7 @@ object BrokerApiVersionsCommand {
val opts = new BrokerVersionCommandOptions(args)
val adminClient = createAdminClient(opts)
adminClient.awaitBrokers()
- var brokerMap = adminClient.listAllBrokerVersionInfo()
+ val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.foreach { case (broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 71dae8a..1a3b116 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -28,8 +28,6 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.CommonClientConfigs
import joptsimple._
-import scala.util.{Failure, Success}
-
/**
* A command for delete records of the given partitions down to the specified offset.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 9e516b0..882fe21 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -366,7 +366,7 @@ object TopicCommand extends Logging {
}
}
- def askToProceed: Unit = {
+ def askToProceed(): Unit = {
println("Are you sure you want to continue? [y/n]")
if (!Console.readLine().equalsIgnoreCase("y")) {
println("Ending your session")
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 71153d1..e1d6e02 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -17,8 +17,6 @@
package kafka.admin
-import java.util.concurrent.LinkedBlockingQueue
-
import joptsimple.OptionParser
import org.I0Itec.zkclient.exception.ZkException
import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index e0a03e8..15992d2 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -44,7 +44,7 @@ case class ControlledShutdownResponse(correlationId: Int,
error: Errors = Errors.NONE,
partitionsRemaining: Set[TopicAndPartition])
extends RequestOrResponse() {
- def sizeInBytes(): Int ={
+ def sizeInBytes: Int = {
var size =
4 /* correlation id */ +
2 /* error code */ +
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ceed815..1f23e40 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -22,7 +22,6 @@ import kafka.api.ApiUtils._
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
import kafka.network.RequestChannel
-import kafka.message.MessageSet
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import java.util
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 6bbcab5..032ff77 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -47,7 +47,7 @@ case class TopicMetadataRequest(versionId: Short,
topics.foreach(topic => writeShortString(buffer, topic))
}
- def sizeInBytes(): Int = {
+ def sizeInBytes: Int = {
2 + /* version id */
4 + /* correlation id */
shortStringLength(clientId) + /* client id */
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 8f08089..183dc25 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -22,8 +22,6 @@ import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import org.apache.kafka.common.utils.Time
class Replica(val brokerId: Int,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 35b6bcd..4c94c73 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,7 +1,6 @@
package kafka.common
import kafka.cluster.{Partition, Replica}
-import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 46fbab7..f6d4a74 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -79,7 +79,7 @@ trait ConsumerConnector {
/**
* KAFKA-1743: This method added for backward compatibility.
*/
- def commitOffsets
+ def commitOffsets()
/**
* Commit offsets from an external offsets map.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 51a7a04..7cccfe1 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -45,7 +45,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(Time.SYSTEM.milliseconds),
config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null
- private var cluster: Cluster = null
private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition]
private val lock = new ReentrantLock
private val cond = lock.newCondition()
@@ -126,7 +125,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
inLock(lock) {
partitionMap = topicInfos.map(tpi => (new TopicPartition(tpi.topic, tpi.partitionId), tpi)).toMap
- this.cluster = cluster
noLeaderPartitionSet ++= topicInfos.map(tpi => new TopicPartition(tpi.topic, tpi.partitionId))
cond.signalAll()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 9ca2253..f096c55 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -17,7 +17,7 @@
package kafka.consumer
-import kafka.utils.{IteratorTemplate, Logging, CoreUtils}
+import kafka.utils.{IteratorTemplate, Logging}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index faba42f..914cedd 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -37,7 +37,7 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
/**
* Create an iterator over messages in the stream.
*/
- def iterator(): ConsumerIterator[K,V] = iter
+ def iterator: ConsumerIterator[K,V] = iter
/**
* This method clears the queue being iterated during the consumer rebalancing. This is mainly
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 52c3d8b..5d4fb8b 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -17,7 +17,6 @@
package kafka.consumer
-import org.I0Itec.zkclient.ZkClient
import kafka.common.TopicAndPartition
import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 369da05..ee8fa1e 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -502,33 +502,17 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)
-class Callbacks private (var leaderAndIsrResponseCallback: AbstractResponse => Unit = null,
- var updateMetadataResponseCallback: AbstractResponse => Unit = null,
- var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = null)
+class Callbacks private (var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit)
object Callbacks {
class CallbackBuilder {
- var leaderAndIsrResponseCbk: AbstractResponse => Unit = null
- var updateMetadataResponseCbk: AbstractResponse => Unit = null
var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null
- def leaderAndIsrCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
- leaderAndIsrResponseCbk = cbk
- this
- }
-
- def updateMetadataCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
- updateMetadataResponseCbk = cbk
- this
- }
-
def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = {
stopReplicaResponseCbk = cbk
this
}
- def build: Callbacks = {
- new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk)
- }
+ def build: Callbacks = new Callbacks(stopReplicaResponseCbk)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0ba412b..ff47f14 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -613,7 +613,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
def incrementControllerEpoch() = {
try {
- var newControllerEpoch = controllerContext.epoch + 1
+ val newControllerEpoch = controllerContext.epoch + 1
val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
if(!updateSucceeded)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 54960b9..19c37fa 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.WriteTxnMarkersResponse
import scala.collection.mutable
-import collection.JavaConversions._
+import scala.collection.JavaConverters._
class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnStateManager: TransactionStateManager,
@@ -41,7 +41,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
val correlation = requestHeader.correlationId
trace(s"Cancelled $api request $requestHeader with correlation id $correlation due to node ${response.destination} being disconnected")
- for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+ for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry
@@ -82,7 +82,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnMarker.producerEpoch,
txnMarker.transactionResult,
txnMarker.coordinatorEpoch,
- txnMarker.partitions.toSet)
+ txnMarker.partitions.asScala.toSet)
}
}
}
@@ -91,7 +91,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
- for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+ for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry
val errors = writeTxnMarkerResponse.errors(txnMarker.producerId)
@@ -132,7 +132,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
abortSending = true
} else {
txnMetadata synchronized {
- for ((topicPartition: TopicPartition, error: Errors) <- errors) {
+ for ((topicPartition, error) <- errors.asScala) {
error match {
case Errors.NONE =>
txnMetadata.removePartition(topicPartition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index efd5405..fdb14cb 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -39,7 +39,7 @@ class TopicMetadataRequest(val versionId: Short,
def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
- def sizeInBytes: Int = underlying.sizeInBytes()
+ def sizeInBytes: Int = underlying.sizeInBytes
override def toString: String = {
describe(true)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 44f9245..b0b40b9 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -48,5 +48,5 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
* Close API to close the producer pool connections to all Kafka brokers. Also closes
* the zookeeper client connection if one exists
*/
- def close = underlying.close
+ def close() = underlying.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index a125676..bfc6828 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -37,8 +37,8 @@ import scala.math.ceil
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
-abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean)
- extends Logging {
+abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
+ val maxIndexSize: Int = -1, val writable: Boolean) extends Logging {
protected def entrySize: Int
@@ -109,7 +109,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
/* Windows won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS)
- forceUnmap(mmap);
+ forceUnmap(mmap)
try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 6e0ebfb..4a4a59f 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.common.LogCleaningAbortedException
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
+import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.TopicPartition
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index e4939e8..53c18fe 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -48,8 +48,9 @@ import kafka.common.InvalidOffsetException
* All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
* storage format.
*/
-class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
- extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) {
+// Avoid shadowing mutable `file` in AbstractIndex
+class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
+ extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
override def entrySize = 8
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 19ab71a..6c9c32b 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -49,11 +49,9 @@ import org.apache.kafka.common.record.RecordBatch
* No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
*
*/
-class TimeIndex(file: File,
- baseOffset: Long,
- maxIndexSize: Int = -1,
- writable: Boolean = true)
- extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize, writable) with Logging {
+// Avoid shadowing mutable file in AbstractIndex
+class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
+ extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) with Logging {
override def entrySize = 12
@@ -206,5 +204,4 @@ class TimeIndex(file: File,
"Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
" bytes which is not positive or not a multiple of 12.")
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index ca623ae..1894213 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging {
* @param tags Additional attributes which mBean will have.
* @return Sanitized metric name object.
*/
- private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
+ private def metricName(name: String, tags: scala.collection.Map[String, String]) = {
val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
@@ -52,7 +52,7 @@ trait KafkaMetricsGroup extends Logging {
}
- private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
+ private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = {
val nameBuilder: StringBuilder = new StringBuilder
nameBuilder.append(group)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 2d2bfdb..d6cf4c8 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -132,7 +132,7 @@ class Producer[K,V](val config: ProducerConfig,
KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
if (producerSendThread != null)
producerSendThread.shutdown
- eventHandler.close
+ eventHandler.close()
info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/async/EventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala
index 3a17bfb..44fb1eb 100644
--- a/core/src/main/scala/kafka/producer/async/EventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/EventHandler.scala
@@ -33,5 +33,5 @@ trait EventHandler[K,V] {
/**
* Cleans up and shuts down the event handler
*/
- def close
+ def close(): Unit
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 79ed1b8..0377093 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -53,7 +53,7 @@ class ProducerSendThread[K,V](val threadName: String,
}
}
- def shutdown = {
+ def shutdown(): Unit = {
info("Begin shutting down ProducerSendThread")
queue.put(shutdownCommand)
shutdownLatch.await
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index f99a088..b84d75c 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -56,21 +56,20 @@ object Acl {
if (aclJson == null || aclJson.isEmpty)
return collection.immutable.Set.empty[Acl]
- var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
- Json.parseFull(aclJson).foreach { m =>
+ Json.parseFull(aclJson).toSet[Any].flatMap { m =>
val aclMap = m.asInstanceOf[Map[String, Any]]
//the acl json version.
require(aclMap(VersionKey) == CurrentVersion)
- val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
- aclSet.foreach(item => {
- val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
- val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
- val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
- val host: String = item(HostsKey).asInstanceOf[String]
- acls += new Acl(principal, permissionType, host, operation)
- })
+ val aclSet = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
+ aclSet.map { item =>
+ val principal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
+ val permissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
+ val operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
+ val host = item(HostsKey).asInstanceOf[String]
+ new Acl(principal, permissionType, host, operation)
+ }
}
- acls.toSet
+
}
def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5c85eef..3970a4b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -215,7 +215,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// Compute the delay
val clientQuotaEntity = clientSensors.quotaEntity
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
- throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt
+ throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/utils/IteratorTemplate.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
index 17c152d..7cd161e 100644
--- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala
+++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
@@ -42,12 +42,12 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T
}
def peek(): T = {
- if(!hasNext())
+ if(!hasNext)
throw new NoSuchElementException()
nextItem
}
- def hasNext(): Boolean = {
+ def hasNext: Boolean = {
if(state == FAILED)
throw new IllegalStateException("Iterator is in failed state")
state match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 8e130cf..d20fdd7 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -127,8 +127,8 @@ class KafkaScheduler(val threads: Int,
}
}
- private def ensureRunning = {
- if(!isStarted)
+ private def ensureRunning(): Unit = {
+ if (!isStarted)
throw new IllegalStateException("Kafka scheduler is not running.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 35bdded..00a7c9f 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -30,7 +30,7 @@ import org.junit.Test
class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
- def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+ def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
@Test(timeout=120000)
def checkBrokerApiVersionCommandOutput() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 4c74bca..012f254 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -364,7 +364,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
client.close()
}
- override def generateConfigs() = {
+ override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach { config =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index d146e9d..27cafd7 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -59,7 +59,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- override def generateConfigs() = {
+ override def generateConfigs = {
FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, serverConfig))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 3866cc1..3376d23 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -187,14 +187,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* Tests the ability of producing and consuming with the appropriate ACLs set.
*/
@Test
- def testProduceConsumeViaAssign {
+ def testProduceConsumeViaAssign(): Unit = {
setAclsAndProduce()
consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head, numRecords)
}
@Test
- def testProduceConsumeViaSubscribe {
+ def testProduceConsumeViaSubscribe(): Unit = {
setAclsAndProduce()
consumers.head.subscribe(List(topic).asJava)
consumeRecords(this.consumers.head, numRecords)
@@ -215,12 +215,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* isn't set.
*/
@Test(expected = classOf[TimeoutException])
- def testNoProduceWithoutDescribeAcl {
+ def testNoProduceWithoutDescribeAcl(): Unit = {
sendRecords(numRecords, tp)
}
@Test
- def testNoProduceWithDescribeAcl {
+ def testNoProduceWithDescribeAcl(): Unit = {
AclCommand.main(describeAclArgs)
servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
@@ -239,7 +239,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* ACL set.
*/
@Test(expected = classOf[KafkaException])
- def testNoConsumeWithoutDescribeAclViaAssign {
+ def testNoConsumeWithoutDescribeAclViaAssign(): Unit = {
noConsumeWithoutDescribeAclSetup
consumers.head.assign(List(tp).asJava)
// the exception is expected when the consumer attempts to lookup offsets
@@ -247,14 +247,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
@Test(expected = classOf[TimeoutException])
- def testNoConsumeWithoutDescribeAclViaSubscribe {
+ def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
noConsumeWithoutDescribeAclSetup
consumers.head.subscribe(List(topic).asJava)
// this should timeout since the consumer will not be able to fetch any metadata for the topic
consumeRecords(this.consumers.head, timeout = 3000)
}
- private def noConsumeWithoutDescribeAclSetup {
+ private def noConsumeWithoutDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs)
AclCommand.main(groupAclArgs)
servers.foreach { s =>
@@ -270,13 +270,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
}
}
-
- /**
- * Tests that a consumer fails to consume messages without the appropriate
- * ACL set.
- */
+
@Test
- def testNoConsumeWithDescribeAclViaAssign {
+ def testNoConsumeWithDescribeAclViaAssign(): Unit = {
noConsumeWithDescribeAclSetup
consumers.head.assign(List(tp).asJava)
@@ -290,7 +286,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
@Test
- def testNoConsumeWithDescribeAclViaSubscribe {
+ def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
noConsumeWithDescribeAclSetup
consumers.head.subscribe(List(topic).asJava)
@@ -303,7 +299,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
}
- private def noConsumeWithDescribeAclSetup {
+ private def noConsumeWithDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs)
AclCommand.main(groupAclArgs)
servers.foreach { s =>
@@ -318,7 +314,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* ACL set.
*/
@Test
- def testNoGroupAcl {
+ def testNoGroupAcl(): Unit = {
AclCommand.main(produceAclArgs)
servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 6a4c552..6c61cd9 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
val topicAndPartition = new TopicAndPartition(topic, part)
this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.api.EndToEndClusterIdTest$MockBrokerMetricsReporter")
- override def generateConfigs() = {
+ override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_.putAll(serverConfig))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 9fe0e5c..aa92f40 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -51,7 +51,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
//
// Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
// a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
- override def generateConfigs() = {
+ override def generateConfigs = {
FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
.map(KafkaConfig.fromProps(_, overridingProps))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 0c44ca9..49a096a 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -49,7 +49,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
- def generateConfigs() =
+ def generateConfigs =
TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index a2f2041..cb5262d 100644
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -35,7 +35,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString)
- def generateConfigs() =
+ def generateConfigs =
(0 until numServers) map { node =>
TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString))
} map (KafkaConfig.fromProps(_, overridingProps))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index 7e549c8..cc9ee3e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -53,7 +53,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
* the second one connects ok, but fails to consume messages due to the ACL.
*/
@Test(timeout = 15000)
- def testTwoConsumersWithDifferentSaslCredentials {
+ def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
setAclsAndProduce()
val consumer1 = consumers.head
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 0e216a2..810f481 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -22,13 +22,12 @@ import java.util.Properties
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
-import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
-import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{Ignore, Test}
+import org.junit.Test
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.junit.Assert._
@@ -67,7 +66,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
//
// Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
// a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
- override def generateConfigs() = {
+ override def generateConfigs = {
FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
.map(KafkaConfig.fromProps(_, overridingProps))
}
@@ -105,7 +104,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
!shouldAbort), new ErrorLoggingCallback(outputTopic, record.key, record.value, true))
}
trace(s"Sent ${records.size} messages. Committing offsets.")
- producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup)
+ producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumerGroup)
if (shouldAbort) {
trace(s"Committed offsets. Aborting transaction of ${records.size} messages.")
@@ -150,7 +149,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId,
securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
- consumer.subscribe(topics)
+ consumer.subscribe(topics.asJava)
consumer
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index b7b1a12..1f9851d 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -30,8 +30,8 @@ import org.junit.Test
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
- override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect)
- .map(KafkaConfig.fromProps(_, new Properties()))
+ override def generateConfigs: Seq[KafkaConfig] =
+ TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
@Test
def testCommaSeparatedRegex(): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index d08552e..9bc362c 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -64,7 +64,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
- def testTopicDoesNotExist {
+ def testTopicDoesNotExist(): Unit = {
try {
AdminUtils.addPartitions(zkUtils, "Blah", 1)
fail("Topic should not exist")
@@ -74,7 +74,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
- def testWrongReplicaCount {
+ def testWrongReplicaCount(): Unit = {
try {
AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
fail("Add partitions should fail")
@@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
- def testIncrementPartitions {
+ def testIncrementPartitions(): Unit = {
AdminUtils.addPartitions(zkUtils, topic1, 3)
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
@@ -111,7 +111,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
- def testManualAssignmentOfReplicas {
+ def testManualAssignmentOfReplicas(): Unit = {
AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3")
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
- def testReplicaPlacementAllServers {
+ def testReplicaPlacementAllServers(): Unit = {
AdminUtils.addPartitions(zkUtils, topic3, 7)
// read metadata from a broker and verify the new topic partitions exist
@@ -166,7 +166,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
- def testReplicaPlacementPartialServers {
+ def testReplicaPlacementPartialServers(): Unit = {
AdminUtils.addPartitions(zkUtils, topic2, 3)
// read metadata from a broker and verify the new topic partitions exist
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index aa202bc..a8955f5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -28,7 +28,7 @@ import kafka.integration.KafkaServerTestHarness
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class DeleteConsumerGroupTest extends KafkaServerTestHarness {
- def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
+ def generateConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
@Test
def testGroupWideDeleteInZK() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 2c09cc4..7000308 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -47,7 +47,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
private var consumerGroupExecutor: ConsumerGroupExecutor = _
// configure the servers and clients
- override def generateConfigs() = {
+ override def generateConfigs = {
TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
KafkaConfig.fromProps(props)
}
@@ -274,6 +274,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
}
}
+ @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0")
private def createOldConsumer(): Unit = {
val consumerProps = new Properties
consumerProps.setProperty("group.id", group)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index c03be66..6727fad 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -40,7 +40,8 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
val props = new Properties
// configure the servers and clients
- override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+ override def generateConfigs =
+ TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
@Before
override def setUp() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index e3b0aa8..dadd002 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -30,7 +30,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
var servers: Seq[KafkaServer] = null
val topicName = "my-topic"
val delayMs = 1000
- def zkUpdateDelay = {Thread.sleep(delayMs)}
+ def zkUpdateDelay(): Unit = Thread.sleep(delayMs)
@Before
override def setUp() {
@@ -49,7 +49,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}
@Test
- def shouldMoveSinglePartition {
+ def shouldMoveSinglePartition(): Unit = {
//Given a single replica on server 100
startBrokers(Seq(100, 101))
val partition = 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 9e23983..c75c28a 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -133,7 +133,7 @@ class ReassignPartitionsCommandTest extends Logging {
case "topic2" =>
assertEquals("0:101,0:102", configChange.get(LeaderReplicationThrottledReplicasProp))
assertEquals("0:100", configChange.get(FollowerReplicationThrottledReplicasProp))
- case _ => fail("Unexpected topic $topic")
+ case _ => fail(s"Unexpected topic $topic")
}
calls += 1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 20b7e25..2578243 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -67,7 +67,7 @@ class BrokerEndPointTest extends Logging {
}
@Test
- def testFromJsonV2 {
+ def testFromJsonV2(): Unit = {
val brokerInfoStr = """{
"version":2,
"host":"localhost",
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index f7dd40f..368ee0d 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -18,12 +18,12 @@ package kafka.common
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, ZkUtils}
+import kafka.utils.TestUtils
import org.junit.Test
class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
- override def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+ override def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testProcessNotification() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 0d38e10..5571e03 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -38,7 +38,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
val numNodes = 1
- def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
+ def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
val messages = new mutable.HashMap[Int, Seq[Message]]
val topic = "topic"
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index df80d1d..bbf05e4 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -45,8 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
- override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
- .map(KafkaConfig.fromProps(_, overridingProps))
+ override def generateConfigs =
+ TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
val group = "group1"
val consumer0 = "consumer0"
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 13b7285..446d8ae 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -40,7 +40,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
val metrics = new Metrics()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
- override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
+ override def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 2221d90..fb76ca1 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -31,7 +31,7 @@ import org.junit.Assert._
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
- def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+ def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
val topic = "test_topic"
val group = "default_group"
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 6076089..f23225c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -32,7 +32,7 @@ import kafka.utils.TestUtils
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class FetcherTest extends KafkaServerTestHarness {
val numNodes = 1
- def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
+ def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
val topic = "topic"
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 60a6fb6..bec5026 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -48,7 +48,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
@volatile private var running = true
- override def generateConfigs() = TestUtils.createBrokerConfigs(nodesNum, zkConnect)
+ override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))
@Before
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
index 3977601..455bbde 100644
--- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
@@ -27,7 +27,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
- def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
+ def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@Test
def testDefaultKafkaConfig() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index ff573bc..bc0b81a 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.common.TopicPartition
class PrimitiveApiTest extends ProducerConsumerTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
- def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+ def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testFetchRequestCanProperlySerialize() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 07af590..66103cc 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,14 +17,12 @@
package kafka.integration
-import java.io.File
-
import kafka.admin.AdminUtils
import kafka.api.TopicMetadataResponse
import kafka.client.ClientUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.Errors
@@ -59,7 +57,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
- def testBasicTopicMetadata {
+ def testBasicTopicMetadata(): Unit = {
// create topic
val topic = "test"
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
@@ -77,7 +75,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
- def testGetAllTopicMetadata {
+ def testGetAllTopicMetadata(): Unit = {
// create topic
val topic1 = "testGetAllTopicMetadata1"
val topic2 = "testGetAllTopicMetadata2"
@@ -102,7 +100,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
- def testAutoCreateTopic {
+ def testAutoCreateTopic(): Unit = {
// auto create topic
val topic = "testAutoCreateTopic"
var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
@@ -129,7 +127,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
- def testAutoCreateTopicWithInvalidReplication {
+ def testAutoCreateTopicWithInvalidReplication(): Unit = {
val adHocProps = createBrokerConfig(2, zkConnect)
// Set default replication higher than the number of live brokers
adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
@@ -152,7 +150,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
- def testAutoCreateTopicWithCollision {
+ def testAutoCreateTopicWithCollision(): Unit = {
// auto create topic
val topic1 = "testAutoCreate_Topic"
val topic2 = "testAutoCreate.Topic"
@@ -212,7 +210,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
- def testIsrAfterBrokerShutDownAndJoinsBack {
+ def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
val numBrokers = 2 //just 2 brokers are enough for the test
// start adHoc brokers
@@ -260,12 +258,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
- def testAliveBrokerListWithNoTopics {
+ def testAliveBrokerListWithNoTopics(): Unit = {
checkMetadata(Seq(server1), 1)
}
@Test
- def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
+ def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup(): Unit = {
adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
checkMetadata(adHocServers, numConfigs - 1)
@@ -278,7 +276,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
@Test
- def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
+ def testAliveBrokersListWithNoTopicsAfterABrokerShutdown(): Unit = {
adHocServers = adHocConfigs.map(p => createServer(p))
checkMetadata(adHocServers, numConfigs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 25ed480..24421d0 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -102,7 +102,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
- def testUncleanLeaderElectionEnabled {
+ def testUncleanLeaderElectionEnabled(): Unit = {
// enable unclean leader election
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
@@ -116,7 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@Test
@Ignore // Should be re-enabled after KAFKA-3096 is fixed
- def testUncleanLeaderElectionDisabled {
+ def testUncleanLeaderElectionDisabled(): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
@@ -127,7 +127,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
- def testUncleanLeaderElectionEnabledByTopicOverride {
+ def testUncleanLeaderElectionEnabledByTopicOverride(): Unit = {
// disable unclean leader election globally, but enable for our specific test topic
configProps1.put("unclean.leader.election.enable", "false")
configProps2.put("unclean.leader.election.enable", "false")
@@ -144,7 +144,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@Test
@Ignore // Should be re-enabled after KAFKA-3096 is fixed
- def testCleanLeaderElectionDisabledByTopicOverride {
+ def testCleanLeaderElectionDisabledByTopicOverride(): Unit = {
// enable unclean leader election globally, but disable for our specific test topic
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
@@ -160,7 +160,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
- def testUncleanLeaderElectionInvalidTopicOverride {
+ def testUncleanLeaderElectionInvalidTopicOverride(): Unit = {
startBrokers(Seq(configProps1))
// create topic with an invalid value for unclean leader election
@@ -172,7 +172,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
}
- def verifyUncleanLeaderElectionEnabled {
+ def verifyUncleanLeaderElectionEnabled(): Unit = {
// wait until leader is elected
val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
debug("Leader for " + topic + " is elected to be: %s".format(leaderId))
@@ -205,7 +205,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
assertEquals(List("first", "third"), consumeAllMessages(topic))
}
- def verifyUncleanLeaderElectionDisabled {
+ def verifyUncleanLeaderElectionDisabled(): Unit = {
// wait until leader is elected
val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
debug("Leader for " + topic + " is elected to be: %s".format(leaderId))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 7d8e0c2..2a0525b 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -45,7 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
- def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
+ def generateConfigs =
+ TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
val group = "group1"
val consumer1 = "consumer1"
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index a53602d..199bbbd 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
@Test
- def testWrittenEqualsRead {
+ def testWrittenEqualsRead(): Unit = {
val messageSet = createMessageSet(messages)
assertEquals(messages.toSeq, messageSet.asScala.map(m => m.message))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
index bc60c72..c6112a1 100644
--- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -35,7 +35,7 @@ class TimeIndexTest extends JUnitSuite {
@Before
def setup() {
- this.idx = new TimeIndex(file = nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
+ this.idx = new TimeIndex(nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 16173eb..9b90e91 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -30,13 +30,13 @@ class TransactionIndexTest extends JUnitSuite {
val offset = 0L
@Before
- def setup: Unit = {
+ def setup(): Unit = {
file = TestUtils.tempFile()
index = new TransactionIndex(offset, file)
}
@After
- def teardown: Unit = {
+ def teardown(): Unit = {
index.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 75a86d2..2390b5b 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -57,7 +57,7 @@ class MessageTest extends JUnitSuite {
}
@Test
- def testFieldValues {
+ def testFieldValues(): Unit = {
for(v <- messages) {
// check payload
if(v.payload == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 37c2619..e32f429 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -44,7 +44,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val overridingProps = new Properties
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
- def generateConfigs() =
+ def generateConfigs =
TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps))
val nMessages = 2
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index de0f901..6e7353c 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -62,7 +62,7 @@ class AsyncProducerTest {
Thread.sleep(500)
}
- def close {}
+ def close(): Unit = ()
}
val props = new Properties()
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index cde49de..fa1174d 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -37,7 +37,7 @@ import org.junit.Assert._
class SyncProducerTest extends KafkaServerTestHarness {
private val messageBytes = new Array[Byte](2)
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
- def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
+ def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
private def produceRequest(topic: String,
partition: Int,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 4d50cb8..646143c 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -166,7 +166,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* Tests the migration tool when chroot is being used.
*/
@Test
- def testChroot {
+ def testChroot(): Unit = {
val zkUrl = zkConnect + "/kafka"
zkUtils.createPersistentPath("/kafka")
val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)