You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/07/14 18:46:36 UTC

[2/2] kafka git commit: MINOR: Enable a number of xlint scalac warnings

MINOR: Enable a number of xlint scalac warnings

Update the code where possible to fix the warnings. The unused
warning introduced in Scala 2.12 is quite handy and provides
a reason to compile with Scala 2.12.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3464 from ijuma/scala-xlint


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1685e711
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1685e711
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1685e711

Branch: refs/heads/trunk
Commit: 1685e7112c5d4dc723ffcfa219febaed045b6426
Parents: e391045
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Jul 14 11:44:42 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jul 14 11:44:42 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    | 22 ++++++++++++++-
 .../kafka/admin/BrokerApiVersionsCommand.scala  |  3 +--
 .../kafka/admin/DeleteRecordsCommand.scala      |  2 --
 .../main/scala/kafka/admin/TopicCommand.scala   |  2 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  |  2 --
 .../kafka/api/ControlledShutdownResponse.scala  |  2 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |  1 -
 .../scala/kafka/api/TopicMetadataRequest.scala  |  2 +-
 core/src/main/scala/kafka/cluster/Replica.scala |  2 --
 .../scala/kafka/common/TopicAndPartition.scala  |  1 -
 .../kafka/consumer/ConsumerConnector.scala      |  2 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |  2 --
 .../scala/kafka/consumer/ConsumerIterator.scala |  2 +-
 .../main/scala/kafka/consumer/KafkaStream.scala |  2 +-
 .../kafka/consumer/PartitionAssignor.scala      |  1 -
 .../controller/ControllerChannelManager.scala   | 20 ++------------
 .../kafka/controller/KafkaController.scala      |  2 +-
 ...nsactionMarkerRequestCompletionHandler.scala | 10 +++----
 .../kafka/javaapi/TopicMetadataRequest.scala    |  2 +-
 .../scala/kafka/javaapi/producer/Producer.scala |  2 +-
 .../main/scala/kafka/log/AbstractIndex.scala    |  6 ++---
 .../scala/kafka/log/LogCleanerManager.scala     |  2 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala |  5 ++--
 core/src/main/scala/kafka/log/TimeIndex.scala   |  9 +++----
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |  4 +--
 .../main/scala/kafka/producer/Producer.scala    |  2 +-
 .../kafka/producer/async/EventHandler.scala     |  2 +-
 .../producer/async/ProducerSendThread.scala     |  2 +-
 .../main/scala/kafka/security/auth/Acl.scala    | 21 +++++++--------
 .../scala/kafka/server/ClientQuotaManager.scala |  2 +-
 .../scala/kafka/utils/IteratorTemplate.scala    |  4 +--
 .../main/scala/kafka/utils/KafkaScheduler.scala |  4 +--
 .../admin/BrokerApiVersionsCommandTest.scala    |  2 +-
 .../kafka/api/AdminClientIntegrationTest.scala  |  2 +-
 .../kafka/api/ConsumerBounceTest.scala          |  2 +-
 .../kafka/api/EndToEndAuthorizationTest.scala   | 28 +++++++++-----------
 .../kafka/api/EndToEndClusterIdTest.scala       |  2 +-
 .../kafka/api/ProducerBounceTest.scala          |  2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |  2 +-
 .../api/RackAwareAutoTopicCreationTest.scala    |  2 +-
 .../api/SaslEndToEndAuthorizationTest.scala     |  2 +-
 .../kafka/api/TransactionsBounceTest.scala      | 13 +++++----
 .../tools/MirrorMakerIntegrationTest.scala      |  4 +--
 .../unit/kafka/admin/AddPartitionsTest.scala    | 12 ++++-----
 .../kafka/admin/DeleteConsumerGroupTest.scala   |  2 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala |  3 ++-
 .../kafka/admin/ListConsumerGroupTest.scala     |  3 ++-
 .../admin/ReassignPartitionsClusterTest.scala   |  4 +--
 .../admin/ReassignPartitionsCommandTest.scala   |  2 +-
 .../unit/kafka/cluster/BrokerEndPointTest.scala |  2 +-
 .../ZkNodeChangeNotificationListenerTest.scala  |  4 +--
 .../kafka/consumer/ConsumerIteratorTest.scala   |  2 +-
 .../ZookeeperConsumerConnectorTest.scala        |  4 +--
 .../controller/ControllerFailoverTest.scala     |  2 +-
 .../kafka/integration/AutoOffsetResetTest.scala |  2 +-
 .../unit/kafka/integration/FetcherTest.scala    |  2 +-
 ...MetricsDuringTopicCreationDeletionTest.scala |  2 +-
 .../kafka/integration/MinIsrConfigTest.scala    |  2 +-
 .../kafka/integration/PrimitiveApiTest.scala    |  2 +-
 .../kafka/integration/TopicMetadataTest.scala   | 22 +++++++--------
 .../integration/UncleanLeaderElectionTest.scala | 14 +++++-----
 .../ZookeeperConsumerConnectorTest.scala        |  3 ++-
 .../message/BaseMessageSetTestCases.scala       |  2 +-
 .../scala/unit/kafka/log/TimeIndexTest.scala    |  2 +-
 .../unit/kafka/log/TransactionIndexTest.scala   |  4 +--
 .../scala/unit/kafka/message/MessageTest.scala  |  2 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala  |  2 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |  2 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  2 +-
 .../security/auth/ZkAuthorizationTest.scala     |  2 +-
 .../unit/kafka/server/AdvertiseBrokerTest.scala |  6 ++---
 .../unit/kafka/server/ApiVersionsTest.scala     |  2 +-
 .../unit/kafka/server/BaseRequestTest.scala     |  2 +-
 .../kafka/server/DynamicConfigChangeTest.scala  | 11 ++++----
 .../unit/kafka/server/EdgeCaseRequestTest.scala |  2 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |  2 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  2 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |  8 +++---
 .../unit/kafka/server/ServerStartupTest.scala   | 10 +++----
 .../epoch/LeaderEpochIntegrationTest.scala      |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  8 +++---
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   | 16 +++++------
 83 files changed, 186 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a493493..2921372 100644
--- a/build.gradle
+++ b/build.gradle
@@ -309,9 +309,29 @@ subprojects {
       "-feature",
       "-language:postfixOps",
       "-language:implicitConversions",
-      "-language:existentials"
+      "-language:existentials",
+      "-Xlint:by-name-right-associative",
+      "-Xlint:delayedinit-select",
+      "-Xlint:doc-detached",
+      "-Xlint:missing-interpolator",
+      "-Xlint:nullary-override",
+      "-Xlint:nullary-unit",
+      "-Xlint:option-implicit",
+      "-Xlint:package-object-classes",
+      "-Xlint:poly-implicit-overload",
+      "-Xlint:private-shadow",
+      "-Xlint:stars-align",
+      "-Xlint:type-parameter-shadow",
+      "-Xlint:unsound-match",
     ]
 
+    if (versions.baseScala != '2.11') {
+      scalaCompileOptions.additionalParameters += [
+        "-Xlint:constant",
+        "-Xlint:unused"
+      ]
+    }
+
     configure(scalaCompileOptions.forkOptions) {
       memoryMaximumSize = '1g'
       jvmArgs = ['-Xss2m'] + maxPermSizeArgs

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 4aea3c0..b25a8da 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -24,7 +24,6 @@ import kafka.utils.CommandLineUtils
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.clients.CommonClientConfigs
 import joptsimple._
-import org.apache.kafka.common.Node
 
 import scala.util.{Failure, Success}
 
@@ -41,7 +40,7 @@ object BrokerApiVersionsCommand {
     val opts = new BrokerVersionCommandOptions(args)
     val adminClient = createAdminClient(opts)
     adminClient.awaitBrokers()
-    var brokerMap = adminClient.listAllBrokerVersionInfo()
+    val brokerMap = adminClient.listAllBrokerVersionInfo()
     brokerMap.foreach { case (broker, versionInfoOrError) =>
       versionInfoOrError match {
         case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 71dae8a..1a3b116 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -28,8 +28,6 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.clients.CommonClientConfigs
 import joptsimple._
 
-import scala.util.{Failure, Success}
-
 /**
  * A command for delete records of the given partitions down to the specified offset.
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 9e516b0..882fe21 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -366,7 +366,7 @@ object TopicCommand extends Logging {
     }
   }
 
-  def askToProceed: Unit = {
+  def askToProceed(): Unit = {
     println("Are you sure you want to continue? [y/n]")
     if (!Console.readLine().equalsIgnoreCase("y")) {
       println("Ending your session")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 71153d1..e1d6e02 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -17,8 +17,6 @@
 
 package kafka.admin
 
-import java.util.concurrent.LinkedBlockingQueue
-
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.exception.ZkException
 import kafka.utils.{CommandLineUtils, Logging, ZkUtils}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index e0a03e8..15992d2 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -44,7 +44,7 @@ case class ControlledShutdownResponse(correlationId: Int,
                                       error: Errors = Errors.NONE,
                                       partitionsRemaining: Set[TopicAndPartition])
   extends RequestOrResponse() {
-  def sizeInBytes(): Int ={
+  def sizeInBytes: Int = {
     var size =
       4 /* correlation id */ +
         2 /* error code */ +

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ceed815..1f23e40 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -22,7 +22,6 @@ import kafka.api.ApiUtils._
 import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
 import kafka.network.RequestChannel
-import kafka.message.MessageSet
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.ByteBuffer
 import java.util

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 6bbcab5..032ff77 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -47,7 +47,7 @@ case class TopicMetadataRequest(versionId: Short,
     topics.foreach(topic => writeShortString(buffer, topic))
   }
 
-  def sizeInBytes(): Int = {
+  def sizeInBytes: Int = {
     2 +  /* version id */
     4 + /* correlation id */
     shortStringLength(clientId)  + /* client id */

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 8f08089..183dc25 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -22,8 +22,6 @@ import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
 import org.apache.kafka.common.utils.Time
 
 class Replica(val brokerId: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 35b6bcd..4c94c73 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,7 +1,6 @@
 package kafka.common
 
 import kafka.cluster.{Partition, Replica}
-import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 46fbab7..f6d4a74 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -79,7 +79,7 @@ trait ConsumerConnector {
   /**
    * KAFKA-1743: This method added for backward compatibility.
    */
-  def commitOffsets
+  def commitOffsets()
 
   /**
    * Commit offsets from an external offsets map.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 51a7a04..7cccfe1 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -45,7 +45,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(Time.SYSTEM.milliseconds),
                                        config.clientId, config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null
-  private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition]
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
@@ -126,7 +125,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
 
     inLock(lock) {
       partitionMap = topicInfos.map(tpi => (new TopicPartition(tpi.topic, tpi.partitionId), tpi)).toMap
-      this.cluster = cluster
       noLeaderPartitionSet ++= topicInfos.map(tpi => new TopicPartition(tpi.topic, tpi.partitionId))
       cond.signalAll()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 9ca2253..f096c55 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -17,7 +17,7 @@
 
 package kafka.consumer
 
-import kafka.utils.{IteratorTemplate, Logging, CoreUtils}
+import kafka.utils.{IteratorTemplate, Logging}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index faba42f..914cedd 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -37,7 +37,7 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
   /**
    *  Create an iterator over messages in the stream.
    */
-  def iterator(): ConsumerIterator[K,V] = iter
+  def iterator: ConsumerIterator[K,V] = iter
 
   /**
    * This method clears the queue being iterated during the consumer rebalancing. This is mainly

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 52c3d8b..5d4fb8b 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -17,7 +17,6 @@
 
 package kafka.consumer
 
-import org.I0Itec.zkclient.ZkClient
 import kafka.common.TopicAndPartition
 import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 369da05..ee8fa1e 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -502,33 +502,17 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
 
 case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)
 
-class Callbacks private (var leaderAndIsrResponseCallback: AbstractResponse => Unit = null,
-                         var updateMetadataResponseCallback: AbstractResponse => Unit = null,
-                         var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = null)
+class Callbacks private (var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit)
 
 object Callbacks {
   class CallbackBuilder {
-    var leaderAndIsrResponseCbk: AbstractResponse => Unit = null
-    var updateMetadataResponseCbk: AbstractResponse => Unit = null
     var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null
 
-    def leaderAndIsrCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
-      leaderAndIsrResponseCbk = cbk
-      this
-    }
-
-    def updateMetadataCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
-      updateMetadataResponseCbk = cbk
-      this
-    }
-
     def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = {
       stopReplicaResponseCbk = cbk
       this
     }
 
-    def build: Callbacks = {
-      new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk)
-    }
+    def build: Callbacks = new Callbacks(stopReplicaResponseCbk)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0ba412b..ff47f14 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -613,7 +613,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
   def incrementControllerEpoch() = {
     try {
-      var newControllerEpoch = controllerContext.epoch + 1
+      val newControllerEpoch = controllerContext.epoch + 1
       val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
         ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
       if(!updateSucceeded)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 54960b9..19c37fa 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.WriteTxnMarkersResponse
 
 import scala.collection.mutable
-import collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                                                 txnStateManager: TransactionStateManager,
@@ -41,7 +41,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
       val correlation = requestHeader.correlationId
       trace(s"Cancelled $api request $requestHeader with correlation id $correlation due to node ${response.destination} being disconnected")
 
-      for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+      for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
         val transactionalId = txnIdAndMarker.txnId
         val txnMarker = txnIdAndMarker.txnMarkerEntry
 
@@ -82,7 +82,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                 txnMarker.producerEpoch,
                 txnMarker.transactionResult,
                 txnMarker.coordinatorEpoch,
-                txnMarker.partitions.toSet)
+                txnMarker.partitions.asScala.toSet)
             }
         }
       }
@@ -91,7 +91,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
 
       val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
-      for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+      for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
         val transactionalId = txnIdAndMarker.txnId
         val txnMarker = txnIdAndMarker.txnMarkerEntry
         val errors = writeTxnMarkerResponse.errors(txnMarker.producerId)
@@ -132,7 +132,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
               abortSending = true
             } else {
               txnMetadata synchronized {
-                for ((topicPartition: TopicPartition, error: Errors) <- errors) {
+                for ((topicPartition, error) <- errors.asScala) {
                   error match {
                     case Errors.NONE =>
                       txnMetadata.removePartition(topicPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index efd5405..fdb14cb 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -39,7 +39,7 @@ class TopicMetadataRequest(val versionId: Short,
 
   def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
 
-  def sizeInBytes: Int = underlying.sizeInBytes()
+  def sizeInBytes: Int = underlying.sizeInBytes
 
   override def toString: String = {
     describe(true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 44f9245..b0b40b9 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -48,5 +48,5 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
    * Close API to close the producer pool connections to all Kafka brokers. Also closes
    * the zookeeper client connection if one exists
    */
-  def close = underlying.close
+  def close() = underlying.close()
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index a125676..bfc6828 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -37,8 +37,8 @@ import scala.math.ceil
  * @param baseOffset the base offset of the segment that this index is corresponding to.
  * @param maxIndexSize The maximum index size in bytes.
  */
-abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean)
-    extends Logging {
+abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
+                                   val maxIndexSize: Int = -1, val writable: Boolean) extends Logging {
 
   protected def entrySize: Int
 
@@ -109,7 +109,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
 
       /* Windows won't let us modify the file length while the file is mmapped :-( */
       if (OperatingSystem.IS_WINDOWS)
-        forceUnmap(mmap);
+        forceUnmap(mmap)
       try {
         raf.setLength(roundedNewSize)
         mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 6e0ebfb..4a4a59f 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
 import com.yammer.metrics.core.Gauge
 import kafka.common.LogCleaningAbortedException
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
+import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.CoreUtils._
 import kafka.utils.{Logging, Pool}
 import org.apache.kafka.common.TopicPartition

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index e4939e8..53c18fe 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -48,8 +48,9 @@ import kafka.common.InvalidOffsetException
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
-    extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) {
+// Avoid shadowing mutable `file` in AbstractIndex
+class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
+    extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
 
   override def entrySize = 8
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 19ab71a..6c9c32b 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -49,11 +49,9 @@ import org.apache.kafka.common.record.RecordBatch
  * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
  *
  */
-class TimeIndex(file: File,
-                baseOffset: Long,
-                maxIndexSize: Int = -1,
-                writable: Boolean = true)
-    extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize, writable) with Logging {
+// Avoid shadowing mutable file in AbstractIndex
+class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
+    extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) with Logging {
 
   override def entrySize = 12
 
@@ -206,5 +204,4 @@ class TimeIndex(file: File,
       "Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
           " bytes which is not positive or not a multiple of 12.")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index ca623ae..1894213 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging {
    * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
+  private def metricName(name: String, tags: scala.collection.Map[String, String]) = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
@@ -52,7 +52,7 @@ trait KafkaMetricsGroup extends Logging {
   }
 
 
-  private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
+  private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = {
     val nameBuilder: StringBuilder = new StringBuilder
 
     nameBuilder.append(group)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 2d2bfdb..d6cf4c8 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -132,7 +132,7 @@ class Producer[K,V](val config: ProducerConfig,
         KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
         if (producerSendThread != null)
           producerSendThread.shutdown
-        eventHandler.close
+        eventHandler.close()
         info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/async/EventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala
index 3a17bfb..44fb1eb 100644
--- a/core/src/main/scala/kafka/producer/async/EventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/EventHandler.scala
@@ -33,5 +33,5 @@ trait EventHandler[K,V] {
   /**
    * Cleans up and shuts down the event handler
   */
-  def close
+  def close(): Unit
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 79ed1b8..0377093 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -53,7 +53,7 @@ class ProducerSendThread[K,V](val threadName: String,
     }
   }
 
-  def shutdown = {
+  def shutdown(): Unit = {
     info("Begin shutting down ProducerSendThread")
     queue.put(shutdownCommand)
     shutdownLatch.await

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index f99a088..b84d75c 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -56,21 +56,20 @@ object Acl {
     if (aclJson == null || aclJson.isEmpty)
       return collection.immutable.Set.empty[Acl]
 
-    var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
-    Json.parseFull(aclJson).foreach { m =>
+    Json.parseFull(aclJson).toSet[Any].flatMap { m =>
       val aclMap = m.asInstanceOf[Map[String, Any]]
       //the acl json version.
       require(aclMap(VersionKey) == CurrentVersion)
-      val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
-      aclSet.foreach(item => {
-        val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
-        val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
-        val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
-        val host: String = item(HostsKey).asInstanceOf[String]
-        acls += new Acl(principal, permissionType, host, operation)
-      })
+      val aclSet = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
+      aclSet.map { item =>
+        val principal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
+        val permissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
+        val operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
+        val host = item(HostsKey).asInstanceOf[String]
+        new Acl(principal, permissionType, host, operation)
+      }
     }
-    acls.toSet
+
   }
 
   def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5c85eef..3970a4b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -215,7 +215,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
         // Compute the delay
         val clientQuotaEntity = clientSensors.quotaEntity
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
-        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt
+        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/utils/IteratorTemplate.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
index 17c152d..7cd161e 100644
--- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala
+++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
@@ -42,12 +42,12 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T
   }
   
   def peek(): T = {
-    if(!hasNext())
+    if(!hasNext)
       throw new NoSuchElementException()
     nextItem
   }
   
-  def hasNext(): Boolean = {
+  def hasNext: Boolean = {
     if(state == FAILED)
       throw new IllegalStateException("Iterator is in failed state")
     state match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 8e130cf..d20fdd7 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -127,8 +127,8 @@ class KafkaScheduler(val threads: Int,
     }
   }
   
-  private def ensureRunning = {
-    if(!isStarted)
+  private def ensureRunning(): Unit = {
+    if (!isStarted)
       throw new IllegalStateException("Kafka scheduler is not running.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 35bdded..00a7c9f 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -30,7 +30,7 @@ import org.junit.Test
 
 class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
 
-  def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+  def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
 
   @Test(timeout=120000)
   def checkBrokerApiVersionCommandOutput() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 4c74bca..012f254 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -364,7 +364,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     client.close()
   }
 
-  override def generateConfigs() = {
+  override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
     cfgs.foreach { config =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index d146e9d..27cafd7 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -59,7 +59,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
 
-  override def generateConfigs() = {
+  override def generateConfigs = {
     FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false)
       .map(KafkaConfig.fromProps(_, serverConfig))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 3866cc1..3376d23 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -187,14 +187,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * Tests the ability of producing and consuming with the appropriate ACLs set.
     */
   @Test
-  def testProduceConsumeViaAssign {
+  def testProduceConsumeViaAssign(): Unit = {
     setAclsAndProduce()
     consumers.head.assign(List(tp).asJava)
     consumeRecords(this.consumers.head, numRecords)
   }
 
   @Test
-  def testProduceConsumeViaSubscribe {
+  def testProduceConsumeViaSubscribe(): Unit = {
     setAclsAndProduce()
     consumers.head.subscribe(List(topic).asJava)
     consumeRecords(this.consumers.head, numRecords)
@@ -215,12 +215,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * isn't set.
     */
   @Test(expected = classOf[TimeoutException])
-  def testNoProduceWithoutDescribeAcl {
+  def testNoProduceWithoutDescribeAcl(): Unit = {
     sendRecords(numRecords, tp)
   }
 
   @Test
-  def testNoProduceWithDescribeAcl {
+  def testNoProduceWithDescribeAcl(): Unit = {
     AclCommand.main(describeAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
@@ -239,7 +239,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * ACL set.
     */
   @Test(expected = classOf[KafkaException])
-  def testNoConsumeWithoutDescribeAclViaAssign {
+  def testNoConsumeWithoutDescribeAclViaAssign(): Unit = {
     noConsumeWithoutDescribeAclSetup
     consumers.head.assign(List(tp).asJava)
     // the exception is expected when the consumer attempts to lookup offsets
@@ -247,14 +247,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   }
   
   @Test(expected = classOf[TimeoutException])
-  def testNoConsumeWithoutDescribeAclViaSubscribe {
+  def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
     noConsumeWithoutDescribeAclSetup
     consumers.head.subscribe(List(topic).asJava)
     // this should timeout since the consumer will not be able to fetch any metadata for the topic
     consumeRecords(this.consumers.head, timeout = 3000)
   }
   
-  private def noConsumeWithoutDescribeAclSetup {
+  private def noConsumeWithoutDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs)
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
@@ -270,13 +270,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
     }
   }
- 
-  /**
-    * Tests that a consumer fails to consume messages without the appropriate
-    * ACL set.
-    */
+  
   @Test
-  def testNoConsumeWithDescribeAclViaAssign {
+  def testNoConsumeWithDescribeAclViaAssign(): Unit = {
     noConsumeWithDescribeAclSetup
     consumers.head.assign(List(tp).asJava)
 
@@ -290,7 +286,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   }
   
   @Test
-  def testNoConsumeWithDescribeAclViaSubscribe {
+  def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
     noConsumeWithDescribeAclSetup
     consumers.head.subscribe(List(topic).asJava)
 
@@ -303,7 +299,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
   }
   
-  private def noConsumeWithDescribeAclSetup {
+  private def noConsumeWithDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs)
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
@@ -318,7 +314,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * ACL set.
     */
   @Test
-  def testNoGroupAcl {
+  def testNoGroupAcl(): Unit = {
     AclCommand.main(produceAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 6a4c552..6c61cd9 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   val topicAndPartition = new TopicAndPartition(topic, part)
   this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.api.EndToEndClusterIdTest$MockBrokerMetricsReporter")
 
-  override def generateConfigs() = {
+  override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
     cfgs.foreach(_.putAll(serverConfig))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 9fe0e5c..aa92f40 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -51,7 +51,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   //
   // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
   // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
-  override def generateConfigs() = {
+  override def generateConfigs = {
     FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
       .map(KafkaConfig.fromProps(_, overridingProps))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 0c44ca9..49a096a 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -49,7 +49,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
 
-  def generateConfigs() =
+  def generateConfigs =
     TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
 
   private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index a2f2041..cb5262d 100644
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -35,7 +35,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
   overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
   overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString)
 
-  def generateConfigs() =
+  def generateConfigs =
     (0 until numServers) map { node =>
       TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString))
     } map (KafkaConfig.fromProps(_, overridingProps))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index 7e549c8..cc9ee3e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -53,7 +53,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
     * the second one connects ok, but fails to consume messages due to the ACL.
     */
   @Test(timeout = 15000)
-  def testTwoConsumersWithDifferentSaslCredentials {
+  def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
     setAclsAndProduce()
     val consumer1 = consumers.head
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 0e216a2..810f481 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -22,13 +22,12 @@ import java.util.Properties
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ShutdownableThread, TestUtils}
-import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
-import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.junit.Assert._
 
 
@@ -67,7 +66,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
   //
   // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
   // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
-  override def generateConfigs() = {
+  override def generateConfigs = {
     FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
@@ -105,7 +104,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
             !shouldAbort), new ErrorLoggingCallback(outputTopic, record.key, record.value, true))
         }
         trace(s"Sent ${records.size} messages. Committing offsets.")
-        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup)
+        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumerGroup)
 
         if (shouldAbort) {
           trace(s"Committed offsets. Aborting transaction of ${records.size} messages.")
@@ -150,7 +149,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
 
     val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId,
       securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
-    consumer.subscribe(topics)
+    consumer.subscribe(topics.asJava)
     consumer
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index b7b1a12..1f9851d 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -30,8 +30,8 @@ import org.junit.Test
 
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
-  override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect)
-    .map(KafkaConfig.fromProps(_, new Properties()))
+  override def generateConfigs: Seq[KafkaConfig] =
+    TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
 
   @Test
   def testCommaSeparatedRegex(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index d08552e..9bc362c 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -64,7 +64,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testTopicDoesNotExist {
+  def testTopicDoesNotExist(): Unit = {
     try {
       AdminUtils.addPartitions(zkUtils, "Blah", 1)
       fail("Topic should not exist")
@@ -74,7 +74,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testWrongReplicaCount {
+  def testWrongReplicaCount(): Unit = {
     try {
       AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
       fail("Add partitions should fail")
@@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testIncrementPartitions {
+  def testIncrementPartitions(): Unit = {
     AdminUtils.addPartitions(zkUtils, topic1, 3)
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
@@ -111,7 +111,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testManualAssignmentOfReplicas {
+  def testManualAssignmentOfReplicas(): Unit = {
     AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3")
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testReplicaPlacementAllServers {
+  def testReplicaPlacementAllServers(): Unit = {
     AdminUtils.addPartitions(zkUtils, topic3, 7)
 
     // read metadata from a broker and verify the new topic partitions exist
@@ -166,7 +166,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testReplicaPlacementPartialServers {
+  def testReplicaPlacementPartialServers(): Unit = {
     AdminUtils.addPartitions(zkUtils, topic2, 3)
 
     // read metadata from a broker and verify the new topic partitions exist

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index aa202bc..a8955f5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -28,7 +28,7 @@ import kafka.integration.KafkaServerTestHarness
 
 @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class DeleteConsumerGroupTest extends KafkaServerTestHarness {
-  def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
+  def generateConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
 
   @Test
   def testGroupWideDeleteInZK() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 2c09cc4..7000308 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -47,7 +47,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   private var consumerGroupExecutor: ConsumerGroupExecutor = _
 
   // configure the servers and clients
-  override def generateConfigs() = {
+  override def generateConfigs = {
     TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
       KafkaConfig.fromProps(props)
     }
@@ -274,6 +274,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     }
   }
 
+  @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0")
   private def createOldConsumer(): Unit = {
     val consumerProps = new Properties
     consumerProps.setProperty("group.id", group)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index c03be66..6727fad 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -40,7 +40,8 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
   val props = new Properties
 
   // configure the servers and clients
-  override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+  override def generateConfigs =
+    TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
 
   @Before
   override def setUp() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index e3b0aa8..dadd002 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -30,7 +30,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   var servers: Seq[KafkaServer] = null
   val topicName = "my-topic"
   val delayMs = 1000
-  def zkUpdateDelay = {Thread.sleep(delayMs)}
+  def zkUpdateDelay(): Unit = Thread.sleep(delayMs)
 
   @Before
   override def setUp() {
@@ -49,7 +49,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def shouldMoveSinglePartition {
+  def shouldMoveSinglePartition(): Unit = {
     //Given a single replica on server 100
     startBrokers(Seq(100, 101))
     val partition = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 9e23983..c75c28a 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -133,7 +133,7 @@ class ReassignPartitionsCommandTest extends Logging {
           case "topic2" =>
             assertEquals("0:101,0:102", configChange.get(LeaderReplicationThrottledReplicasProp))
             assertEquals("0:100", configChange.get(FollowerReplicationThrottledReplicasProp))
-          case _ => fail("Unexpected topic $topic")
+          case _ => fail(s"Unexpected topic $topic")
         }
         calls += 1
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 20b7e25..2578243 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -67,7 +67,7 @@ class BrokerEndPointTest extends Logging {
   }
 
   @Test
-  def testFromJsonV2 {
+  def testFromJsonV2(): Unit = {
     val brokerInfoStr = """{
       "version":2,
       "host":"localhost",

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index f7dd40f..368ee0d 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -18,12 +18,12 @@ package kafka.common
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, ZkUtils}
+import kafka.utils.TestUtils
 import org.junit.Test
 
 class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
 
-  override def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+  override def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
   @Test
   def testProcessNotification() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 0d38e10..5571e03 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -38,7 +38,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
 
   val numNodes = 1
 
-  def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
+  def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
 
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index df80d1d..bbf05e4 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -45,8 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
-  override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
-    .map(KafkaConfig.fromProps(_, overridingProps))
+  override def generateConfigs =
+    TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
 
   val group = "group1"
   val consumer0 = "consumer0"

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 13b7285..446d8ae 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -40,7 +40,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
   val metrics = new Metrics()
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
-  override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
+  override def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect)
     .map(KafkaConfig.fromProps(_, overridingProps))
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 2221d90..fb76ca1 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -31,7 +31,7 @@ import org.junit.Assert._
 @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
 class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
 
-  def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+  def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
   val topic = "test_topic"
   val group = "default_group"

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 6076089..f23225c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -32,7 +32,7 @@ import kafka.utils.TestUtils
 @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class FetcherTest extends KafkaServerTestHarness {
   val numNodes = 1
-  def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
+  def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
 
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 60a6fb6..bec5026 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -48,7 +48,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
 
   @volatile private var running = true
   
-  override def generateConfigs() = TestUtils.createBrokerConfigs(nodesNum, zkConnect)
+  override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, zkConnect)
     .map(KafkaConfig.fromProps(_, overridingProps))
 
   @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
index 3977601..455bbde 100644
--- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
@@ -27,7 +27,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
-  def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
+  def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
 
   @Test
   def testDefaultKafkaConfig() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index ff573bc..bc0b81a 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.common.TopicPartition
 class PrimitiveApiTest extends ProducerConsumerTestHarness {
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
-  def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+  def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
   @Test
   def testFetchRequestCanProperlySerialize() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 07af590..66103cc 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,14 +17,12 @@
 
 package kafka.integration
 
-import java.io.File
-
 import kafka.admin.AdminUtils
 import kafka.api.TopicMetadataResponse
 import kafka.client.ClientUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
@@ -59,7 +57,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testBasicTopicMetadata {
+  def testBasicTopicMetadata(): Unit = {
     // create topic
     val topic = "test"
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
@@ -77,7 +75,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testGetAllTopicMetadata {
+  def testGetAllTopicMetadata(): Unit = {
     // create topic
     val topic1 = "testGetAllTopicMetadata1"
     val topic2 = "testGetAllTopicMetadata2"
@@ -102,7 +100,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testAutoCreateTopic {
+  def testAutoCreateTopic(): Unit = {
     // auto create topic
     val topic = "testAutoCreateTopic"
     var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
@@ -129,7 +127,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testAutoCreateTopicWithInvalidReplication {
+  def testAutoCreateTopicWithInvalidReplication(): Unit = {
     val adHocProps = createBrokerConfig(2, zkConnect)
     // Set default replication higher than the number of live brokers
     adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
@@ -152,7 +150,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testAutoCreateTopicWithCollision {
+  def testAutoCreateTopicWithCollision(): Unit = {
     // auto create topic
     val topic1 = "testAutoCreate_Topic"
     val topic2 = "testAutoCreate.Topic"
@@ -212,7 +210,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testIsrAfterBrokerShutDownAndJoinsBack {
+  def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
     val numBrokers = 2 //just 2 brokers are enough for the test
 
     // start adHoc brokers
@@ -260,12 +258,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testAliveBrokerListWithNoTopics {
+  def testAliveBrokerListWithNoTopics(): Unit = {
     checkMetadata(Seq(server1), 1)
   }
 
   @Test
-  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
+  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup(): Unit = {
     adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
 
     checkMetadata(adHocServers, numConfigs - 1)
@@ -278,7 +276,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
 
 
   @Test
-  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
+  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown(): Unit = {
     adHocServers = adHocConfigs.map(p => createServer(p))
 
     checkMetadata(adHocServers, numConfigs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 25ed480..24421d0 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -102,7 +102,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUncleanLeaderElectionEnabled {
+  def testUncleanLeaderElectionEnabled(): Unit = {
     // enable unclean leader election
     configProps1.put("unclean.leader.election.enable", "true")
     configProps2.put("unclean.leader.election.enable", "true")
@@ -116,7 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   @Test
   @Ignore // Should be re-enabled after KAFKA-3096 is fixed
-  def testUncleanLeaderElectionDisabled {
+  def testUncleanLeaderElectionDisabled(): Unit = {
     // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
 
@@ -127,7 +127,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUncleanLeaderElectionEnabledByTopicOverride {
+  def testUncleanLeaderElectionEnabledByTopicOverride(): Unit = {
     // disable unclean leader election globally, but enable for our specific test topic
     configProps1.put("unclean.leader.election.enable", "false")
     configProps2.put("unclean.leader.election.enable", "false")
@@ -144,7 +144,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   @Test
   @Ignore // Should be re-enabled after KAFKA-3096 is fixed
-  def testCleanLeaderElectionDisabledByTopicOverride {
+  def testCleanLeaderElectionDisabledByTopicOverride(): Unit = {
     // enable unclean leader election globally, but disable for our specific test topic
     configProps1.put("unclean.leader.election.enable", "true")
     configProps2.put("unclean.leader.election.enable", "true")
@@ -160,7 +160,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUncleanLeaderElectionInvalidTopicOverride {
+  def testUncleanLeaderElectionInvalidTopicOverride(): Unit = {
     startBrokers(Seq(configProps1))
 
     // create topic with an invalid value for unclean leader election
@@ -172,7 +172,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     }
   }
 
-  def verifyUncleanLeaderElectionEnabled {
+  def verifyUncleanLeaderElectionEnabled(): Unit = {
     // wait until leader is elected
     val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
@@ -205,7 +205,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     assertEquals(List("first", "third"), consumeAllMessages(topic))
   }
 
-  def verifyUncleanLeaderElectionDisabled {
+  def verifyUncleanLeaderElectionDisabled(): Unit = {
     // wait until leader is elected
     val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 7d8e0c2..2a0525b 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -45,7 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
-  def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
+  def generateConfigs =
+    TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
 
   val group = "group1"
   val consumer1 = "consumer1"

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index a53602d..199bbbd 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
 
   @Test
-  def testWrittenEqualsRead {
+  def testWrittenEqualsRead(): Unit = {
     val messageSet = createMessageSet(messages)
     assertEquals(messages.toSeq, messageSet.asScala.map(m => m.message))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
index bc60c72..c6112a1 100644
--- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -35,7 +35,7 @@ class TimeIndexTest extends JUnitSuite {
 
   @Before
   def setup() {
-    this.idx = new TimeIndex(file = nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
+    this.idx = new TimeIndex(nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 16173eb..9b90e91 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -30,13 +30,13 @@ class TransactionIndexTest extends JUnitSuite {
   val offset = 0L
 
   @Before
-  def setup: Unit = {
+  def setup(): Unit = {
     file = TestUtils.tempFile()
     index = new TransactionIndex(offset, file)
   }
 
   @After
-  def teardown: Unit = {
+  def teardown(): Unit = {
     index.close()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 75a86d2..2390b5b 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -57,7 +57,7 @@ class MessageTest extends JUnitSuite {
   }
 
   @Test
-  def testFieldValues {
+  def testFieldValues(): Unit = {
     for(v <- messages) {
       // check payload
       if(v.payload == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 37c2619..e32f429 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -44,7 +44,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   val overridingProps = new Properties
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
-  def generateConfigs() =
+  def generateConfigs =
     TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps))
 
   val nMessages = 2

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index de0f901..6e7353c 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -62,7 +62,7 @@ class AsyncProducerTest {
         Thread.sleep(500)
       }
 
-      def close {}
+      def close(): Unit = ()
     }
 
     val props = new Properties()

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index cde49de..fa1174d 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -37,7 +37,7 @@ import org.junit.Assert._
 class SyncProducerTest extends KafkaServerTestHarness {
   private val messageBytes =  new Array[Byte](2)
   // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
-  def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
+  def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
 
   private def produceRequest(topic: String,
     partition: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1685e711/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 4d50cb8..646143c 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -166,7 +166,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    * Tests the migration tool when chroot is being used.
    */
   @Test
-  def testChroot {
+  def testChroot(): Unit = {
     val zkUrl = zkConnect + "/kafka"
     zkUtils.createPersistentPath("/kafka")
     val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)