You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/25 02:17:46 UTC

[3/4] kafka git commit: MINOR: A bunch of clean-ups related to usage of unused variables

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 1ef91b9..850b0e0 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -110,7 +110,7 @@ object ByteBufferMessageSet {
           while (true)
             innerMessageAndOffsets.add(readMessageFromStream(compressed))
         } catch {
-          case eofe: EOFException =>
+          case _: EOFException =>
             // we don't do anything at all here, because the finally
             // will close the compressed input stream, and we simply
             // want to return the innerMessageAndOffsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 7daab67..13b57e3 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -158,23 +158,16 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
   )
 
   private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
-    val filteredTags = tags
-      .filter { case (tagKey, tagValue) => tagValue != ""}
+    val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
     if (filteredTags.nonEmpty) {
-      val tagsString = filteredTags
-        .map { case (key, value) => "%s=%s".format(key, value)}
-        .mkString(",")
-
+      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
       Some(tagsString)
     }
-    else {
-      None
-    }
+    else None
   }
 
   private def toScope(tags: collection.Map[String, String]): Option[String] = {
-    val filteredTags = tags
-      .filter { case (tagKey, tagValue) => tagValue != ""}
+    val filteredTags = tags.filter { case (_, tagValue) => tagValue != ""}
     if (filteredTags.nonEmpty) {
       // convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
       val tagsString = filteredTags
@@ -184,9 +177,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
 
       Some(tagsString)
     }
-    else {
-      None
-    }
+    else None
   }
 
   def removeAllConsumerMetrics(clientId: String) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 5408e0d..0f10577 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -82,7 +82,7 @@ class BlockingChannel( val host: String,
                          connectTimeoutMs))
 
       } catch {
-        case e: Throwable => disconnect()
+        case _: Throwable => disconnect()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 6b10e51..f793811 100755
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -24,8 +24,6 @@ import org.apache.kafka.common.utils.Utils
 @deprecated("This class has been deprecated and will be removed in a future release. " +
             "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
 class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
-  private val random = new java.util.Random
-  
   def partition(key: Any, numPartitions: Int): Int = {
     Utils.abs(key.hashCode) % numPartitions
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index c2f95ea..2d2bfdb 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -104,7 +104,7 @@ class Producer[K,V](val config: ProducerConfig,
             }
           }
           catch {
-            case e: InterruptedException =>
+            case _: InterruptedException =>
               false
           }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 797c77b..17d09ce 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -25,7 +25,7 @@ object Resource {
   def fromString(str: String): Resource = {
     str.split(Separator, 2) match {
       case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name)
-      case s => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
+      case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 72f79d5..5cfdcd6 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -316,13 +316,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     try {
       zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         try {
           debug(s"Node $path does not exist, attempting to create it.")
           zkUtils.createPersistentPath(path, data)
           (true, 0)
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             debug(s"Failed to create node for $path because it already exists.")
             (false, 0)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index d87a8cf..5e584ab 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -97,7 +97,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
 
   def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
     mapLock synchronized {
-      for ((key, fetcher) <- fetcherThreadMap)
+      for (fetcher <- fetcherThreadMap.values)
         fetcher.removePartitions(partitions)
     }
     info("Removed fetcher for partitions %s".format(partitions.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 8cb2270..325c7af 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -127,7 +127,7 @@ class AdminManager(val config: KafkaConfig,
           AdminUtils.deleteTopic(zkUtils, topic)
           DeleteTopicMetadata(topic, Errors.NONE)
         } catch {
-          case e: TopicAlreadyMarkedForDeletionException =>
+          case _: TopicAlreadyMarkedForDeletionException =>
             // swallow the exception, and still track deletion allowing multiple calls to wait for deletion
             DeleteTopicMetadata(topic, Errors.NONE)
           case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 00e5d0c..cc2c4cd 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -65,7 +65,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
             throw new IOException("Unrecognized version of the server meta.properties file: " + version)
         }
       } catch {
-        case e: FileNotFoundException =>
+        case _: FileNotFoundException =>
           warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
           None
         case e1: Exception =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c4472c6..0c7c26b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -183,7 +183,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       // trigger the callback immediately if quota is not violated
       callback(0)
     } catch {
-      case qve: QuotaViolationException =>
+      case _: QuotaViolationException =>
         // Compute the delay
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
         throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
@@ -412,9 +412,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
           logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
           overriddenQuota.put(quotaId, newQuota)
           (sanitizedUser, clientId) match {
-            case (Some(u), Some(c)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-            case (Some(u), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-            case (None, Some(c)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+            case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+            case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+            case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
             case (None, None) =>
           }
         case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 2feeae8..4bf04e6 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -112,10 +112,10 @@ class DelayedFetch(delayMs: Long,
             }
           }
         } catch {
-          case utpe: UnknownTopicOrPartitionException => // Case B
+          case _: UnknownTopicOrPartitionException => // Case B
             debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
             return forceComplete()
-          case nle: NotLeaderForPartitionException =>  // Case A
+          case _: NotLeaderForPartitionException =>  // Case A
             debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
             return forceComplete()
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index b31d838..2e9e714 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -88,7 +88,6 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
                            private val configHandlers: Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
                            private val time: Time = SystemTime) extends Logging {
-  private var lastExecutedChange = -1L
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(json: String) = {
@@ -106,7 +105,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
                 "Supported versions are 1 and 2.")
           }
 
-        case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
+        case _ => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
           "{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " +
           "{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
           " Received: " + json)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1b723f..c6c8dbd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -755,7 +755,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
         java.util.Collections.emptyList())
     } catch {
-      case e: TopicExistsException => // let it go, possibly another broker created this topic
+      case _: TopicExistsException => // let it go, possibly another broker created this topic
         new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
           java.util.Collections.emptyList())
       case ex: Throwable  => // Catch all to prevent unhandled errors

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 117899b..0ae9124 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -43,7 +43,6 @@ class KafkaHealthcheck(brokerId: Int,
                        rack: Option[String],
                        interBrokerProtocolVersion: ApiVersion) extends Logging {
 
-  private val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   private[server] val sessionExpireListener = new SessionExpireListener
 
   def startup() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index df46336..a39fe49 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -111,7 +111,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
             throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
         }
       } catch {
-        case e: NumberFormatException => throw malformedLineException(line)
+        case _: NumberFormatException => throw malformedLineException(line)
       } finally {
         reader.close()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 32bc660..b43695a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -223,20 +223,20 @@ class ReplicaManager(val config: KafkaConfig,
       deletePartition.toString, topic, partitionId))
     val errorCode = Errors.NONE.code
     getPartition(topic, partitionId) match {
-      case Some(partition) =>
-        if(deletePartition) {
+      case Some(_) =>
+        if (deletePartition) {
           val removedPartition = allPartitions.remove((topic, partitionId))
           if (removedPartition != null) {
             removedPartition.delete() // this will delete the local log
             val topicHasPartitions = allPartitions.keys.exists { case (t, _) => topic == t }
             if (!topicHasPartitions)
-                BrokerTopicStats.removeMetrics(topic)
+              BrokerTopicStats.removeMetrics(topic)
           }
         }
       case None =>
         // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
         // This could happen when topic is being deleted while broker is down and recovers.
-        if(deletePartition) {
+        if (deletePartition) {
           val topicAndPartition = TopicAndPartition(topic, partitionId)
 
           if(logManager.getLog(topicAndPartition).isDefined) {
@@ -358,10 +358,9 @@ class ReplicaManager(val config: KafkaConfig,
     } else {
       // If required.acks is outside accepted range, something is wrong with the client
       // Just return an error and don't handle the request at all
-      val responseStatus = messagesPerPartition.map {
-        case (topicAndPartition, messageSet) =>
-          topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
-            LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
+      val responseStatus = messagesPerPartition.map { case (topicAndPartition, _) =>
+        topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+          LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
       }
       responseCallback(responseStatus)
     }
@@ -657,11 +656,9 @@ class ReplicaManager(val config: KafkaConfig,
     replicaStateChangeLock synchronized {
       val responseMap = new mutable.HashMap[TopicPartition, Short]
       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
         stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
           correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
-        }
         BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
@@ -694,7 +691,7 @@ class ReplicaManager(val config: KafkaConfig,
           }
         }
 
-        val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
+        val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
           stateInfo.leader == config.brokerId
         }
         val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
@@ -830,7 +827,7 @@ class ReplicaManager(val config: KafkaConfig,
         val newLeaderBrokerId = partitionStateInfo.leader
         metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
           // Only change partition state when the leader is available
-          case Some(leaderBroker) =>
+          case Some(_) =>
             if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
               partitionsToMakeFollower += partition
             else

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 5c487bf..bb6caa0 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -83,7 +83,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
       leaderId = brokerId
       onBecomingLeader()
     } catch {
-      case e: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         // If someone else has written the path, then
         leaderId = getControllerID 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 56ae0c9..eea66f8 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -119,11 +119,11 @@ object ConsoleConsumer extends Logging {
       val msg: BaseConsumerRecord = try {
         consumer.receive()
       } catch {
-        case nse: StreamEndException =>
+        case _: StreamEndException =>
           trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
           // Consumer is already closed
           return
-        case nse: WakeupException =>
+        case _: WakeupException =>
           trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
           // Consumer will be closed
           return
@@ -358,7 +358,7 @@ object ConsoleConsumer extends Logging {
             val offset =
               try offsetString.toLong
               catch {
-                case e: NumberFormatException => invalidOffset(offsetString)
+                case _: NumberFormatException => invalidOffset(offsetString)
               }
             if (offset < 0) invalidOffset(offsetString)
             offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 63a04c9..2b3f56d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -71,7 +71,7 @@ object ConsumerPerformance {
       val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
       val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
       var threadList = List[ConsumerPerfThread]()
-      for ((topic, streamList) <- topicMessageStreams)
+      for (streamList <- topicMessageStreams.values)
         for (i <- 0 until streamList.length)
           threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f4f7acf..c299676 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -414,12 +414,10 @@ object DumpLogSegments {
         }
       }
 
-      shallowOffsetNotFound.foreach {
-        case (fileName, listOfShallowOffsetNotFound) => {
-          System.err.println("The following indexed offsets are not found in the log.")
-          listOfShallowOffsetNotFound.foreach(m => {
-            System.err.println("Indexed offset: %s, found log offset: %s".format(m._1, m._2))
-          })
+      shallowOffsetNotFound.values.foreach { listOfShallowOffsetNotFound =>
+        System.err.println("The following indexed offsets are not found in the log.")
+        listOfShallowOffsetNotFound.foreach { case (indexedOffset, logOffset) =>
+          System.err.println(s"Indexed offset: $indexedOffset, found log offset: $logOffset")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 1c92088..9aaad3e 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -122,8 +122,7 @@ object EndToEndLatency {
 
       //Check we only got the one message
       if (recordIter.hasNext) {
-        var count = 1
-        for (elem <- recordIter) count += 1
+        val count = 1 + recordIter.size
         throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 17b8f0b..1a6ba69 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -375,7 +375,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           mirrorMakerConsumer.commit()
           throw e
 
-        case e: CommitFailedException =>
+        case _: CommitFailedException =>
           warn("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to " +
             "another instance. If you see this regularly, it could indicate that you need to either increase " +
             s"the consumer's ${consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number of records " +
@@ -435,9 +435,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
               maybeFlushAndCommitOffsets()
             }
           } catch {
-            case cte: ConsumerTimeoutException =>
+            case _: ConsumerTimeoutException =>
               trace("Caught ConsumerTimeoutException, continue iteration.")
-            case we: WakeupException =>
+            case _: WakeupException =>
               trace("Caught ConsumerWakeupException, continue iteration.")
           }
           maybeFlushAndCommitOffsets()
@@ -485,7 +485,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         mirrorMakerConsumer.stop()
       }
       catch {
-        case ie: InterruptedException =>
+        case _: InterruptedException =>
           warn("Interrupt during shutdown of the mirror maker thread")
       }
     }
@@ -495,7 +495,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         shutdownLatch.await()
         info("Mirror maker thread shutdown complete")
       } catch {
-        case ie: InterruptedException =>
+        case _: InterruptedException =>
           warn("Shutdown of the mirror maker thread interrupted")
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index d88ec41..4e2c7ef 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -32,9 +32,6 @@ object ReplayLogProducer extends Logging {
   def main(args: Array[String]) {
     val config = new Config(args)
 
-    val executor = Executors.newFixedThreadPool(config.numThreads)
-    val allDone = new CountDownLatch(config.numThreads)
-
     // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
     ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
     Thread.sleep(500)
@@ -51,7 +48,7 @@ object ReplayLogProducer extends Logging {
     val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
     val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
     var threadList = List[ZKConsumerThread]()
-    for ((topic, streamList) <- topicMessageStreams)
+    for (streamList <- topicMessageStreams.values)
       for (stream <- streamList)
         threadList ::= new ZKConsumerThread(config, stream)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 9a059df..01d3aa8 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -107,7 +107,7 @@ object ReplicaVerificationTool extends Logging {
       Pattern.compile(regex)
     }
     catch {
-      case e: PatternSyntaxException =>
+      case _: PatternSyntaxException =>
         throw new RuntimeException(regex + " is an invalid regex.")
     }
 
@@ -151,14 +151,13 @@ object ReplicaVerificationTool extends Logging {
           topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId))
           .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
     debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
-    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap(
-      topicMetadataResponse =>
-        topicMetadataResponse.partitionsMetadata.map(
-          partitionMetadata =>
-            (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))
-    ).groupBy(_._2)
-     .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map {
-        case(topicAndPartition, leaderId) => topicAndPartition })
+    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
+      topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
+        (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
+      }
+    }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
+       topicAndPartition
+     })
     debug("Leaders per broker: " + leadersPerBroker)
 
     val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
@@ -236,8 +235,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
   }
 
   private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
-    offsetResponse.partitionErrorAndOffsets.filter {
-      case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != Errors.NONE.code
+    offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) =>
+      partitionOffsetsResponse.error != Errors.NONE.code
     }.mkString
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 1c059bb..99b5aae 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -271,8 +271,8 @@ object CoreUtils extends Logging {
    */
   def duplicates[T](s: Traversable[T]): Iterable[T] = {
     s.groupBy(identity)
-      .map{ case (k,l) => (k,l.size)}
-      .filter{ case (k,l) => l > 1 }
+      .map { case (k, l) => (k, l.size)}
+      .filter { case (_, l) => l > 1 }
       .keys
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index b43b4b1..896c300 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -52,7 +52,7 @@ class FileLock(val file: File) extends Logging {
         flock = channel.tryLock()
         flock != null
       } catch {
-        case e: OverlappingFileLockException => false
+        case _: OverlappingFileLockException => false
       }
     }
   }
@@ -77,4 +77,4 @@ class FileLock(val file: File) extends Logging {
       channel.close()
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index aa120ab..5d2549e 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -61,12 +61,10 @@ object Mx4jLoader extends Logging {
       true
     }
     catch {
-	  case e: ClassNotFoundException => {
+	  case _: ClassNotFoundException =>
         info("Will not load MX4J, mx4j-tools.jar is not in the classpath")
-      }
-      case e: Throwable => {
+      case e: Throwable =>
         warn("Could not start register mbean in JMX", e)
-      }
     }
     false
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 31e8a92..369bb23 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -58,13 +58,13 @@ object ReplicationUtils extends Logging {
           (expectedLeader,writtenLeader) match {
             case (Some(expectedLeader),Some(writtenLeader)) =>
               if(expectedLeader == writtenLeader)
-                return (true,writtenStat.getVersion())
+                return (true, writtenStat.getVersion())
             case _ =>
           }
         case None =>
       }
     } catch {
-      case e1: Exception =>
+      case _: Exception =>
     }
     (false,-1)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index f57245f..9600b0a 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -181,7 +181,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
   /**
    * Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
    */
-  def getMap(name: String, valid: String => Boolean = s => true): Map[String, String] = {
+  def getMap(name: String, valid: String => Boolean = _ => true): Map[String, String] = {
     try {
       val m = CoreUtils.parseCsvMap(getString(name, ""))
       m.foreach {
@@ -208,7 +208,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
       CompressionCodec.getCompressionCodec(prop.toInt)
     }
     catch {
-      case nfe: NumberFormatException =>
+      case _: NumberFormatException =>
         CompressionCodec.getCompressionCodec(prop)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 80a9f1a..787cb8f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -237,7 +237,7 @@ class ZkUtils(val zkClient: ZkClient,
       createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId))
       proposedClusterId
     } catch {
-      case e: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper."))
     }
   }
@@ -389,7 +389,7 @@ class ZkUtils(val zkClient: ZkClient,
                                                       isSecure)
       zkCheckedEphemeral.create()
     } catch {
-      case e: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
                 + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
                 + "else you have shutdown this broker and restarted it faster than the zookeeper "
@@ -445,7 +445,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       ZkPath.createEphemeral(zkClient, path, data, acls)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createEphemeral(zkClient, path, data, acls)
     }
@@ -465,7 +465,7 @@ class ZkUtils(val zkClient: ZkClient,
         try {
           storedData = readData(path)._1
         } catch {
-          case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
+          case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
         }
         if (storedData == null || storedData != data) {
           info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -484,7 +484,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       ZkPath.createPersistent(zkClient, path, data, acls)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createPersistent(zkClient, path, data, acls)
     }
@@ -503,12 +503,12 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.writeData(path, data)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         try {
           ZkPath.createPersistent(zkClient, path, data, acls)
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             zkClient.writeData(path, data)
         }
     }
@@ -573,7 +573,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.writeData(path, data)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createEphemeral(zkClient, path, data, acls)
     }
@@ -583,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.delete(path)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
         false
@@ -599,7 +599,7 @@ class ZkUtils(val zkClient: ZkClient,
       zkClient.delete(path, expectedVersion)
       true
     } catch {
-      case e: ZkBadVersionException => false
+      case _: ZkBadVersionException => false
     }
   }
 
@@ -607,7 +607,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.deleteRecursive(path)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
     }
@@ -624,7 +624,7 @@ class ZkUtils(val zkClient: ZkClient,
     val dataAndStat = try {
                         (Some(zkClient.readData(path, stat)), stat)
                       } catch {
-                        case e: ZkNoNodeException =>
+                        case _: ZkNoNodeException =>
                           (None, stat)
                       }
     dataAndStat
@@ -642,7 +642,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.getChildren(path)
     } catch {
-      case e: ZkNoNodeException => Nil
+      case _: ZkNoNodeException => Nil
     }
   }
 
@@ -754,7 +754,7 @@ class ZkUtils(val zkClient: ZkClient,
           updatePersistentPath(zkPath, jsonData)
           debug("Updated partition reassignment path with %s".format(jsonData))
         } catch {
-          case nne: ZkNoNodeException =>
+          case _: ZkNoNodeException =>
             createPersistentPath(zkPath, jsonData)
             debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
           case e2: Throwable => throw new AdminOperationException(e2.toString)
@@ -835,7 +835,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       writeToZk
     } catch {
-      case e1: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         makeSurePersistentPathExists(path)
         writeToZk
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index ce91a30..f13f59f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -111,7 +111,6 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
   @Test
   def testDescribeConsumerGroupForNonExistentGroup() {
     val nonExistentGroup = "non" + groupId
-    val sum = client.describeConsumerGroup(nonExistentGroup).consumers
     assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 55e8e4f..8502ae0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -149,11 +149,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
 
-    for (i <- 0 until producerCount)
+    for (_ <- 0 until producerCount)
       producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
         maxBlockMs = 3000,
         acks = 1)
-    for (i <- 0 until consumerCount)
+    for (_ <- 0 until consumerCount)
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
     // create the consumer offset topic
@@ -339,7 +339,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       sendRecords(numRecords, tp)
       fail("should have thrown exception")
     } catch {
-      case e: TimeoutException => //expected
+      case _: TimeoutException => //expected
     }
   }
 
@@ -517,7 +517,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumeRecords(consumer)
       Assert.fail("Expected TopicAuthorizationException")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case _: TopicAuthorizationException => //expected
     }
   }
 
@@ -595,7 +595,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumeRecords(consumer)
       Assert.fail("Expected TopicAuthorizationException")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case _: TopicAuthorizationException => //expected
     } finally consumer.close()
   }
 
@@ -813,11 +813,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
   }
 
-  private def removeAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
-    servers.head.apis.authorizer.get.removeAcls(acls, resource)
-    TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) -- acls, servers.head.apis.authorizer.get, resource)
-  }
-
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
                              numRecords: Int = 1,
                              startingOffset: Int = 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 102b7cf..732b99f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -280,7 +280,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
       try {
         consumer.poll(50)
       } catch {
-        case e: WakeupException => // ignore for shutdown
+        case _: WakeupException => // ignore for shutdown
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 816f36a..b5aaaf4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -142,12 +142,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         producer.send(record4, callback)
         fail("Should not allow sending a record without topic")
       } catch {
-        case iae: IllegalArgumentException => // this is ok
-        case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+        case _: IllegalArgumentException => // this is ok
       }
 
       // non-blocking send a list of records
-      for (i <- 1 to numRecords)
+      for (_ <- 1 to numRecords)
         producer.send(record0, callback)
 
       // check that all messages have been acked via offset
@@ -234,7 +233,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
       // non-blocking send a list of records
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
-      for (i <- 1 to numRecords)
+      for (_ <- 1 to numRecords)
         producer.send(record0)
       val response0 = producer.send(record0)
 
@@ -328,8 +327,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes))
       fail("Should not allow sending a record to a partition not present in the metadata")
     } catch {
-      case ke: KafkaException => // this is ok
-      case e: Throwable => fail("Only expecting KafkaException", e)
+      case _: KafkaException => // this is ok
     }
 
     AdminUtils.addPartitions(zkUtils, topic, 2)
@@ -370,8 +368,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     try {
       TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
-      for (i <- 0 until 50) {
-        val responses = (0 until numRecords) map (i => producer.send(record))
+      for (_ <- 0 until 50) {
+        val responses = (0 until numRecords) map (_ => producer.send(record))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         producer.flush()
         assertTrue("All requests are complete.", responses.forall(_.isDone()))
@@ -389,15 +387,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     // create topic
     val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
     val leader0 = leaders(0)
-    val leader1 = leaders(1)
 
     // create record
     val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
 
     // Test closing from caller thread.
-    for (i <- 0 until 50) {
+    for (_ <- 0 until 50) {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
-      val responses = (0 until numRecords) map (i => producer.send(record0))
+      val responses = (0 until numRecords) map (_ => producer.send(record0))
       assertTrue("No request is complete.", responses.forall(!_.isDone()))
       producer.close(0, TimeUnit.MILLISECONDS)
       responses.foreach { future =>
@@ -436,7 +433,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         // Trigger another batch in accumulator before close the producer. These messages should
         // not be sent.
         if (sendRecords)
-          (0 until numRecords) foreach (i => producer.send(record))
+          (0 until numRecords) foreach (_ => producer.send(record))
         // The close call will be called by all the message callbacks. This tests idempotence of the close call.
         producer.close(0, TimeUnit.MILLISECONDS)
         // Test close with non zero timeout. Should not block at all.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 4e6c740..479e749 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -276,7 +276,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
 
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
-    servers.foreach { s =>
+    servers.foreach { _ =>
       TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
index b26b242..d15a01d 100644
--- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
 object FixedPortTestUtils {
   def choosePorts(count: Int): Seq[Int] = {
     try {
-      val sockets = (0 until count).map(i => new ServerSocket(0))
+      val sockets = (0 until count).map(_ => new ServerSocket(0))
       val ports = sockets.map(_.getLocalPort())
       sockets.foreach(_.close())
       ports

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index ca020a6..83280dc 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -65,9 +65,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.putAll(consumerSecurityProps)
-    for (i <- 0 until producerCount)
+    for (_ <- 0 until producerCount)
       producers += createNewProducer
-    for (i <- 0 until consumerCount) {
+    for (_ <- 0 until consumerCount) {
       consumers += createNewConsumer
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index d18dc3a..aefe5bd 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -782,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // create a group of consumers, subscribe the consumers to all the topics and start polling
     // for the topic partition assignment
-    val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
+    val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
     try {
       validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
 
@@ -862,10 +862,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       testProducer.send(null, null)
       fail("Should not allow sending a null record")
     } catch {
-      case e: Throwable => {
+      case _: Throwable =>
         assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue())
         assertEquals("Interceptor should not receive metadata with an exception when record is null", 0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue())
-      }
     }
 
     // create consumer with interceptor
@@ -1222,7 +1221,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
     TestUtils.waitUntilTrue(() => {
-      val records = consumer0.poll(50)
+      consumer0.poll(50)
       consumer0.assignment() == newAssignment.asJava
     }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
 
@@ -1335,7 +1334,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                               subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
     assertTrue(consumerCount <= subscriptions.size)
     val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-    for (i <- 0 until consumerCount)
+    for (_ <- 0 until consumerCount)
       consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
     consumers ++= consumerGroup
 
@@ -1364,7 +1363,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                                    topicsToSubscribe: List[String],
                                                    subscriptions: Set[TopicPartition]): Unit = {
     assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
-    for (i <- 0 until numOfConsumersToAdd) {
+    for (_ <- 0 until numOfConsumersToAdd) {
       val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
       consumerGroup += newConsumer
       consumerPollers += subscribeConsumerAndStartPolling(newConsumer, topicsToSubscribe)
@@ -1415,7 +1414,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                                             rebalanceListener: ConsumerRebalanceListener): Unit = {
     consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
     TestUtils.waitUntilTrue(() => {
-      val records = consumer.poll(50)
+      consumer.poll(50)
       consumer.assignment() == subscriptions.asJava
     }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 734eb66..a75e7c7 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -39,7 +39,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
       createNewProducerWithNoSerializer(brokerList)
       fail("Instantiating a producer without specifying a serializer should cause a ConfigException")
     } catch {
-      case ce : ConfigException => // this is ok
+      case _ : ConfigException => // this is ok
     }
 
     // create a producer with explicit serializers should succeed
@@ -67,7 +67,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
       producer.send(record5)
       fail("Should have gotten a SerializationException")
     } catch {
-      case se: SerializationException => // this is ok
+      case _: SerializationException => // this is ok
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 5994a1d..8d676d1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -88,7 +88,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     scheduler.start
 
     // rolling bounce brokers
-    for (i <- 0 until numServers) {
+    for (_ <- 0 until numServers) {
       for (server <- servers) {
         server.shutdown()
         server.awaitShutdown()
@@ -143,7 +143,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
         futures.map(_.get)
         sent += numRecords
       } catch {
-        case e : Exception => failed = true
+        case _ : Exception => failed = true
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index 14807bc..cebfb04 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -102,7 +102,6 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
 
   private val orgName = config.getProperty(MiniKdc.OrgName)
   private val orgDomain = config.getProperty(MiniKdc.OrgDomain)
-  private val dnString = s"dc=$orgName,dc=$orgDomain"
   private val realm = s"${orgName.toUpperCase(Locale.ENGLISH)}.${orgDomain.toUpperCase(Locale.ENGLISH)}"
   private val krb5conf = new File(workDir, "krb5.conf")
 
@@ -163,7 +162,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
     val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
     partition.setId(orgName)
     partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI)
-    val dn = new Dn(dnString)
+    val dn = new Dn(s"dc=$orgName,dc=$orgDomain")
     partition.setSuffixDn(dn)
     ds.addPartition(partition)
 
@@ -207,7 +206,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
     val kerberosConfig = new KerberosConfig
     kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
     kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
-    kerberosConfig.setSearchBaseDn(dnString)
+    kerberosConfig.setSearchBaseDn(s"dc=$orgName,dc=$orgDomain")
     kerberosConfig.setPaEncTimestampRequired(false)
     kdc = new KdcServer(kerberosConfig)
     kdc.setDirectoryService(ds)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 6556100..51f02d1 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -301,7 +301,7 @@ object TestLogCleaning {
           consumedWriter.newLine()
         }
       } catch {
-        case e: ConsumerTimeoutException => 
+        case _: ConsumerTimeoutException =>
       }
     }
     consumedWriter.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 8adc7e2..f5cee0c 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -69,7 +69,6 @@ object StressTestLog {
   abstract class WorkerThread extends Thread {
     override def run() {
       try {
-        var offset = 0
         while(running.get)
           work()
       } catch {
@@ -107,7 +106,7 @@ object StressTestLog {
           case _ =>
         }
       } catch {
-        case e: OffsetOutOfRangeException => // this is okay
+        case _: OffsetOutOfRangeException => // this is okay
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestCrcPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestCrcPerformance.scala b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
index 0c1e1ad..daeecbd 100755
--- a/core/src/test/scala/other/kafka/TestCrcPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
@@ -18,7 +18,6 @@ package kafka.log
 
 import java.util.Random
 import kafka.message._
-import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Utils
 
 object TestCrcPerformance {
@@ -28,21 +27,18 @@ object TestCrcPerformance {
       Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size")
     val numMessages = args(0).toInt
     val messageSize = args(1).toInt
-    //val numMessages = 100000000
-    //val messageSize = 32
 
-    val dir = TestUtils.tempDir()
     val content = new Array[Byte](messageSize)
     new Random(1).nextBytes(content)
 
     // create message test
     val start = System.nanoTime
-    for(i <- 0 until numMessages) {
+    for (_ <- 0 until numMessages)
       new Message(content)
-    }
-    val ellapsed = System.nanoTime - start
-    println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, ellapsed/(1000.0*1000.0*1000.0),
-      ellapsed / numMessages.toDouble))
+
+    val elapsed = System.nanoTime - start
+    println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, elapsed / (1000.0*1000.0*1000.0),
+      elapsed / numMessages.toDouble))
 
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestKafkaAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
index ab807a1..72c7f28 100644
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
@@ -33,12 +33,13 @@ object TestKafkaAppender extends Logging {
     try {
       PropertyConfigurator.configure(args(0))
     } catch {
-      case e: Exception => System.err.println("KafkaAppender could not be initialized ! Exiting..")
-      e.printStackTrace()
-      System.exit(1)
+      case e: Exception =>
+        System.err.println("KafkaAppender could not be initialized ! Exiting..")
+        e.printStackTrace()
+        System.exit(1)
     }
 
-    for(i <- 1 to 10)
+    for (_ <- 1 to 10)
       info("test")    
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index db281bf..6bd8e4f 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -101,7 +101,8 @@ object TestLinearWriteSpeed {
     val rand = new Random
     rand.nextBytes(buffer.array)
     val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
-    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = (0 until numMessages).map(x => new Message(new Array[Byte](messageSize))): _*)
+    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec,
+      messages = (0 until numMessages).map(_ => new Message(new Array[Byte](messageSize))): _*)
     
     val writables = new Array[Writable](numFiles)
     val scheduler = new KafkaScheduler(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 9445191..9db2ffd 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -94,7 +94,7 @@ object TestOffsetManager {
         offset += 1
       }
       catch {
-        case e1: ClosedByInterruptException =>
+        case _: ClosedByInterruptException =>
           offsetsChannel.disconnect()
         case e2: IOException =>
           println("Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s.".format(id, offsetsChannel.host, offsetsChannel.port, groupId, e2))
@@ -158,7 +158,7 @@ object TestOffsetManager {
           }
         }
         catch {
-          case e1: ClosedByInterruptException =>
+          case _: ClosedByInterruptException =>
             channel.disconnect()
             channels.remove(coordinatorId)
           case e2: IOException =>
@@ -168,7 +168,7 @@ object TestOffsetManager {
         }
       }
       catch {
-        case e: IOException =>
+        case _: IOException =>
           println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
           metadataChannel.disconnect()
           println("Creating new query channel.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index ba89fc8..6ccac29 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -104,7 +104,7 @@ object TestPurgatoryPerformance {
     val latch = new CountDownLatch(numRequests)
     val start = System.currentTimeMillis
     val rand = new Random()
-    val keys = (0 until numKeys).map(i => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
+    val keys = (0 until numKeys).map(_ => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
     @volatile var requestArrivalTime = start
     @volatile var end = 0L
     val generator = new Runnable {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 763e4ec..0f846e1 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -69,8 +69,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
       AdminUtils.addPartitions(zkUtils, "Blah", 1)
       fail("Topic should not exist")
     } catch {
-      case e: AdminOperationException => //this is good
-      case e2: Throwable => throw e2
+      case _: AdminOperationException => //this is good
     }
   }
 
@@ -80,8 +79,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
       AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
       fail("Add partitions should fail")
     } catch {
-      case e: AdminOperationException => //this is good
-      case e2: Throwable => throw e2
+      case _: AdminOperationException => //this is good
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index ddfbb51..ff86693 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -250,7 +250,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         ConfigCommand.parseEntity(opts)
         fail("Did not fail with invalid argument list")
       } catch {
-        case e: IllegalArgumentException => // expected exception
+        case _: IllegalArgumentException => // expected exception
       }
     }
 
@@ -315,7 +315,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
           "--alter", "--add-config", "a=b,c=d")
       fail("Did not fail with invalid client-id")
     } catch {
-      case e: InvalidConfigException => // expected
+      case _: InvalidConfigException => // expected
     }
 
     checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ccb3618..d1fcbc0 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -164,7 +164,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   @Test
   def testAddPartitionDuringDeleteTopic() {
     val topic = "test"
-    val topicAndPartition = TopicAndPartition(topic, 0)
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
@@ -208,7 +207,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
       AdminUtils.deleteTopic(zkUtils, "test2")
       fail("Expected UnknownTopicOrPartitionException")
     } catch {
-      case e: UnknownTopicOrPartitionException => // expected exception
+      case _: UnknownTopicOrPartitionException => // expected exception
     }
     // verify delete topic path for test2 is removed from zookeeper
     TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
@@ -270,7 +269,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
       fail("Expected TopicAlreadyMarkedForDeletionException")
     }
     catch {
-      case e: TopicAlreadyMarkedForDeletionException => // expected exception
+      case _: TopicAlreadyMarkedForDeletionException => // expected exception
     }
 
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -300,7 +299,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
     var counter = 0
-    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
       log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
       counter += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 3691919..39bcb7a 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -87,7 +87,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
     // action/test
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupCommand.describeGroup()
+        val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.filter(_.group == group).size == 1 &&
         assignments.get.filter(_.group == group).head.consumerId.isDefined
@@ -113,7 +113,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
     // action/test
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupCommand.describeGroup()
+        val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.filter(_.group == group).size == 2 &&
         assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size == 1 &&

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 2a3724e..90a354e 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -45,7 +45,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging wi
     kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
 
     val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
-    val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+    val (proposedAssignment, _) = ReassignPartitionsCommand.generateAssignment(zkUtils,
       rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
 
     val assignment = proposedAssignment map { case (topicPartition, replicas) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
index fff3e7b..b71b00b 100644
--- a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
@@ -34,7 +34,7 @@ class ApiUtilsTest extends JUnitSuite {
   @Test
   def testShortStringNonASCII() {
     // Random-length strings
-    for(i <- 0 to 100) {
+    for(_ <- 0 to 100) {
       // Since we're using UTF-8 encoding, each encoded byte will be one to four bytes long 
       val s: String = ApiUtilsTest.rnd.nextString(math.abs(ApiUtilsTest.rnd.nextInt()) % (Short.MaxValue / 4))  
       val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
@@ -47,7 +47,7 @@ class ApiUtilsTest extends JUnitSuite {
   @Test
   def testShortStringASCII() {
     // Random-length strings
-    for(i <- 0 to 100) {
+    for(_ <- 0 to 100) {
       val s: String = TestUtils.randomString(math.abs(ApiUtilsTest.rnd.nextInt()) % Short.MaxValue)  
       val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
       ApiUtils.writeShortString(bb, s)
@@ -68,17 +68,13 @@ class ApiUtilsTest extends JUnitSuite {
       ApiUtils.shortStringLength(s2)
       fail
     } catch {
-      case e: KafkaException => {
-        // ok
-      }
+      case _: KafkaException => // ok
     }
     try {
       ApiUtils.writeShortString(bb, s2)
       fail
     } catch {
-      case e: KafkaException => {
-        // ok
-      }
+      case _: KafkaException => // ok
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index f8fbae7..16fe788 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -23,7 +23,6 @@ import kafka.common._
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.SystemTime
 
-import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
 
 import java.nio.ByteBuffer
@@ -37,10 +36,6 @@ import org.junit.Assert._
 object SerializationTestUtils {
   private val topic1 = "test1"
   private val topic2 = "test2"
-  private val leader1 = 0
-  private val isr1 = List(0, 1, 2)
-  private val leader2 = 0
-  private val isr2 = List(0, 2, 3)
   private val partitionDataFetchResponse0 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
   private val partitionDataFetchResponse1 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("second message".getBytes)))
   private val partitionDataFetchResponse2 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("third message".getBytes)))
@@ -84,37 +79,6 @@ object SerializationTestUtils {
   private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))),
                              new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))),
                              new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT))))
-  private val brokerEndpoints = brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
-
-  private val partitionMetaData0 = new PartitionMetadata(0, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 0)
-  private val partitionMetaData1 = new PartitionMetadata(1, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail, errorCode = 1)
-  private val partitionMetaData2 = new PartitionMetadata(2, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 2)
-  private val partitionMetaData3 = new PartitionMetadata(3, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail.tail, errorCode = 3)
-  private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
-  private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
-  private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
-
-  private val leaderAndIsr0 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
-  private val leaderAndIsr1 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
-  private val leaderAndIsr2 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
-  private val leaderAndIsr3 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
-
-  private val leaderIsrAndControllerEpoch0 = new LeaderIsrAndControllerEpoch(leaderAndIsr0, controllerEpoch = 0)
-  private val leaderIsrAndControllerEpoch1 = new LeaderIsrAndControllerEpoch(leaderAndIsr1, controllerEpoch = 0)
-  private val leaderIsrAndControllerEpoch2 = new LeaderIsrAndControllerEpoch(leaderAndIsr2, controllerEpoch = 0)
-  private val leaderIsrAndControllerEpoch3 = new LeaderIsrAndControllerEpoch(leaderAndIsr3, controllerEpoch = 0)
-
-  private val partitionStateInfo0 = new PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id).toSet)
-  private val partitionStateInfo1 = new PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id).toSet)
-  private val partitionStateInfo2 = new PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id).toSet)
-  private val partitionStateInfo3 = new PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id).toSet)
-
-  private val updateMetadataRequestPartitionStateInfo = collection.immutable.Map(
-    TopicAndPartition(topic1,0) -> partitionStateInfo0,
-    TopicAndPartition(topic1,1) -> partitionStateInfo1,
-    TopicAndPartition(topic1,2) -> partitionStateInfo2,
-    TopicAndPartition(topic1,3) -> partitionStateInfo3
-  )
 
   def createTestProducerRequest: ProducerRequest = {
     new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/ConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index 26154f2..2d20b1e 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -40,7 +40,7 @@ class ConfigTest {
         fail("Should throw InvalidClientIdException.")
       }
       catch {
-        case e: InvalidConfigException => "This is good."
+        case _: InvalidConfigException => // This is good
       }
     }
 
@@ -51,7 +51,7 @@ class ConfigTest {
         ProducerConfig.validateClientId(validClientIds(i))
       }
       catch {
-        case e: Exception => fail("Should not throw exception.")
+        case _: Exception => fail("Should not throw exception.")
       }
     }
   }
@@ -70,7 +70,7 @@ class ConfigTest {
         fail("Should throw InvalidGroupIdException.")
       }
       catch {
-        case e: InvalidConfigException => "This is good."
+        case _: InvalidConfigException => // This is good
       }
     }
 
@@ -81,7 +81,7 @@ class ConfigTest {
         ConsumerConfig.validateGroupId(validGroupIds(i))
       }
       catch {
-        case e: Exception => fail("Should not throw exception.")
+        case _: Exception => fail("Should not throw exception.")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala
index 66549af..39eb315 100644
--- a/core/src/test/scala/unit/kafka/common/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala
@@ -28,7 +28,7 @@ class TopicTest {
     val invalidTopicNames = new ArrayBuffer[String]()
     invalidTopicNames += ("", ".", "..")
     var longName = "ATCG"
-    for (i <- 1 to 6)
+    for (_ <- 1 to 6)
       longName += longName
     invalidTopicNames += longName
     invalidTopicNames += longName.drop(6)
@@ -43,7 +43,7 @@ class TopicTest {
         fail("Should throw InvalidTopicException.")
       }
       catch {
-        case e: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
+        case _: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
       }
     }
 
@@ -54,7 +54,7 @@ class TopicTest {
         Topic.validate(validTopicNames(i))
       }
       catch {
-        case e: Exception => fail("Should not throw exception.")
+        case _: Exception => fail("Should not throw exception.")
       }
     }
   }