You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/25 02:17:44 UTC

[1/4] kafka git commit: MINOR: A bunch of clean-ups related to usage of unused variables

Repository: kafka
Updated Branches:
  refs/heads/trunk 1fc450fdb -> d09267383


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 7ef4550..d8f0de4 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -40,8 +40,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
       zkUtils.createPersistentPath(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
-      case configException: ConfigException =>
-      case exception: Throwable => fail("Should have thrown ConfigException")
+      case _: ConfigException =>
     }
     zkUtils.close()
   }
@@ -50,13 +49,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreatePersistentPath {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-    try {
-      ZkPath.resetNamespaceCheckedState
-      zkUtils.createPersistentPath(path)
-    } catch {
-      case exception: Throwable => fail("Failed to create persistent path")
-    }
-
+    ZkPath.resetNamespaceCheckedState
+    zkUtils.createPersistentPath(path)
     assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
     zkUtils.close()
   }
@@ -70,8 +64,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
       zkUtils.makeSurePersistentPathExists(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
-      case configException: ConfigException =>
-      case exception: Throwable => fail("Should have thrown ConfigException")
+      case _: ConfigException =>
     }
     zkUtils.close()
   }
@@ -80,13 +73,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testMakeSurePersistsPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-    try {
-      ZkPath.resetNamespaceCheckedState
-      zkUtils.makeSurePersistentPathExists(path)
-    } catch {
-      case exception: Throwable => fail("Failed to create persistent path")
-    }
-
+    ZkPath.resetNamespaceCheckedState
+    zkUtils.makeSurePersistentPathExists(path)
     assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
     zkUtils.close()
   }
@@ -100,8 +88,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
       zkUtils.createEphemeralPathExpectConflict(path, "somedata")
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
-      case configException: ConfigException =>
-      case exception: Throwable => fail("Should have thrown ConfigException")
+      case _: ConfigException =>
     }
     zkUtils.close()
   }
@@ -110,13 +97,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreateEphemeralPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-    try {
-      ZkPath.resetNamespaceCheckedState
-      zkUtils.createEphemeralPathExpectConflict(path, "somedata")
-    } catch {
-      case exception: Throwable => fail("Failed to create ephemeral path")
-    }
-
+    ZkPath.resetNamespaceCheckedState
+    zkUtils.createEphemeralPathExpectConflict(path, "somedata")
     assertTrue("Failed to create ephemeral path", zkUtils.pathExists(path))
     zkUtils.close()
   }
@@ -131,8 +113,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
       zkUtils.createSequentialPersistentPath(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
-      case configException: ConfigException =>
-      case exception: Throwable => fail("Should have thrown ConfigException")
+      case _: ConfigException =>
     }
     zkUtils.close()
   }
@@ -141,15 +122,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreatePersistentSequentialExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-
-    var actualPath: String = ""
-    try {
-      ZkPath.resetNamespaceCheckedState
-      actualPath = zkUtils.createSequentialPersistentPath(path)
-    } catch {
-      case exception: Throwable => fail("Failed to create persistent path")
-    }
-
+    ZkPath.resetNamespaceCheckedState
+    val actualPath = zkUtils.createSequentialPersistentPath(path)
     assertTrue("Failed to create persistent path", zkUtils.pathExists(actualPath))
     zkUtils.close()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
index 6eaee70..bd0b257 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
@@ -39,7 +39,7 @@ object ZkFourLetterWords {
       outStream.write("stat".getBytes)
       outStream.flush()
     } catch {
-      case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw")
+      case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw", e)
     } finally {
       sock.close
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 25e0620..7dcd63d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -52,7 +52,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @SuppressWarnings("unchecked")
     private <K, V> KeyValueStore<K, V> createStore(final ProcessorContext context, final Class<K> keyClass, final Class<V> valueClass, final boolean useContextSerdes, final boolean enableCaching) {
 
-        Stores.PersistentKeyValueFactory<?, ?> factory = null;
+        Stores.PersistentKeyValueFactory<?, ?> factory;
         if (useContextSerdes) {
             factory = Stores
                     .create("my-store")


[2/4] kafka git commit: MINOR: A bunch of clean-ups related to usage of unused variables

Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 653b40c..b7fc657 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -52,7 +52,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+    topicInfos = configs.map(_ => new PartitionTopicInfo(topic,
       0,
       queue,
       new AtomicLong(consumedOffset),
@@ -77,7 +77,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
                                                     clientId = "")
-    val receivedMessages = (0 until 5).map(i => iter.next.message).toList
+    val receivedMessages = (0 until 5).map(_ => iter.next.message)
 
     assertFalse(iter.hasNext)
     assertEquals(0, queue.size) // Shutdown command has been consumed.
@@ -101,17 +101,14 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
       new FailDecoder(),
       clientId = "")
 
-    val receivedMessages = (0 until 5).map{ i =>
+    (0 until 5).foreach { i =>
       assertTrue(iter.hasNext)
       val message = iter.next
       assertEquals(message.offset, i + consumedOffset)
 
-      try {
-        message.message // should fail
-      }
+      try message.message // should fail
       catch {
-        case e: UnsupportedOperationException => // this is ok
-        case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage)
+        case _: UnsupportedOperationException => // this is ok
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 818229c..9e568f8 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -35,49 +35,49 @@ class PartitionAssignorTest extends Logging {
     val assignor = new RoundRobinAssignor
 
     /** various scenarios with only wildcard consumers */
-    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
       val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
       val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
 
       val topicPartitionCounts = Map((1 to topicCount).map(topic => {
         ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
-      }).toSeq:_*)
+      }):_*)
       
-      val subscriptions = Map((1 to consumerCount).map(consumer => {
+      val subscriptions = Map((1 to consumerCount).map { consumer =>
         val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
         ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
-      }).toSeq:_*)
+      }:_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
       val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
       EasyMock.replay(zkUtils.zkClient)
       PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true)
-    })
+    }
   }
 
   @Test
   def testRangePartitionAssignor() {
     val assignor = new RangeAssignor
-    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
       val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
       val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
 
       val topicPartitionCounts = Map((1 to topicCount).map(topic => {
         ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
-      }).toSeq:_*)
+      }):_*)
 
-      val subscriptions = Map((1 to consumerCount).map(consumer => {
+      val subscriptions = Map((1 to consumerCount).map { consumer =>
         val streamCounts = Map((1 to topicCount).map(topic => {
             val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
             ("topic-" + topic, streamCount)
-          }).toSeq:_*)
+          }):_*)
         ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
-      }).toSeq:_*)
+      }:_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
       val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
       EasyMock.replay(zkUtils.zkClient)
 
       PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
-    })
+    }
   }
 }
 
@@ -163,7 +163,7 @@ private object PartitionAssignorTest extends Logging {
 
   private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkUtils: ZkUtils,
                               verifyAssignmentIsUniform: Boolean = false) {
-    val assignments = scenario.subscriptions.map{ case(consumer, subscription)  =>
+    val assignments = scenario.subscriptions.map { case (consumer, _)  =>
       val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkUtils)
       assignor.assign(ctx).get(consumer)
     }
@@ -200,7 +200,7 @@ private object PartitionAssignorTest extends Logging {
   /** For each consumer stream, count the number of partitions that it owns. */
   private def partitionCountPerStream(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = {
     val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]()
-    assignment.foreach { case (topicPartition, owner) =>
+    assignment.foreach { case (_, owner) =>
       val updatedCount = ownedCounts.getOrElse(owner, 0) + 1
       ownedCounts.put(owner, updatedCount)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index b054794..df80d1d 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -81,13 +81,12 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
     // no messages to consume, we should hit timeout;
     // also the iterator should support re-entrant, so loop it twice
-    for (i <- 0 until 2) {
+    for (_ <- 0 until 2) {
       try {
         getMessages(topicMessageStreams0, nMessages * 2)
         fail("should get an exception")
       } catch {
-        case e: ConsumerTimeoutException => // this is ok
-        case e: Throwable => throw e
+        case _: ConsumerTimeoutException => // this is ok
       }
     }
 
@@ -147,7 +146,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
-    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+    zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
                         sendMessages(servers, topic, nMessages, 1)
@@ -164,10 +163,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
     // call createMesssageStreams twice should throw MessageStreamsExistException
     try {
-      val topicMessageStreams4 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+      zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
       fail("Should fail with MessageStreamsExistException")
     } catch {
-      case e: MessageStreamsExistException => // expected
+      case _: MessageStreamsExistException => // expected
     }
 
     zkConsumerConnector1.shutdown
@@ -235,7 +234,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
-    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
+    zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
     val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
                         sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
@@ -304,10 +303,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
       zkConsumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     var receivedMessages: List[String] = Nil
-    for ((topic, messageStreams) <- topicMessageStreams) {
+    for (messageStreams <- topicMessageStreams.values) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator
-        for (i <- 0 until nMessages * 2) {
+        for (_ <- 0 until nMessages * 2) {
           assertTrue(iterator.hasNext())
           val message = iterator.next().message
           receivedMessages ::= message
@@ -390,7 +389,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     // Register consumer rebalance listener
     val rebalanceListener2 = new TestConsumerRebalanceListener()
     zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+    zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     // Consume messages from consumer 1 to make sure it has finished rebalance
     getMessages(topicMessageStreams1, nMessages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 699715b..6430b33 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -100,9 +100,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
           controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
           log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
         } catch {
-          case e : Exception => {
-            log.info("Thread interrupted")
-          }
+          case _: Exception => log.info("Thread interrupted")
         }
       }
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 0a1032f..a52fe48 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -156,7 +156,7 @@ class GroupMetadataManagerTest {
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+    member.awaitingJoinCallback = _ => ()
     group.add(memberId, member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
@@ -389,7 +389,7 @@ class GroupMetadataManagerTest {
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+    member.awaitingJoinCallback = _ => ()
     group.add(memberId, member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 8539340..bf695bf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -252,7 +252,7 @@ class GroupMetadataTest extends JUnitSuite {
       protocolType, List(("roundrobin", Array.empty[Byte])))
 
     group.transitionTo(PreparingRebalance)
-    member.awaitingJoinCallback = (result) => {}
+    member.awaitingJoinCallback = _ => ()
     group.add(memberId, member)
 
     assertEquals(0, group.generationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4515b94..2221d90 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -83,7 +83,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
       TestUtils.getBrokerListStrFromServers(servers),
       keyEncoder = classOf[StringEncoder].getName)
 
-    for(i <- 0 until numMessages)
+    for(_ <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
 
     // update offset in zookeeper for consumer to jump "forward" in time
@@ -103,12 +103,12 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
     var received = 0
     val iter = messageStream.iterator
     try {
-      for (i <- 0 until numMessages) {
+      for (_ <- 0 until numMessages) {
         iter.next // will throw a timeout exception if the message isn't there
         received += 1
       }
     } catch {
-      case e: ConsumerTimeoutException => 
+      case _: ConsumerTimeoutException =>
         info("consumer timed out after receiving " + received + " messages.")
     } finally {
       producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 881837f..003c04c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -48,7 +48,7 @@ class FetcherTest extends KafkaServerTestHarness {
 
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils)
     fetcher.stopConnections()
-    val topicInfos = configs.map(c =>
+    val topicInfos = configs.map(_ =>
       new PartitionTopicInfo(topic,
         0,
         queue,
@@ -81,13 +81,10 @@ class FetcherTest extends KafkaServerTestHarness {
 
   def fetch(expected: Int) {
     var count = 0
-    while(true) {
+    while (count < expected) {
       val chunk = queue.poll(2L, TimeUnit.SECONDS)
       assertNotNull("Timed out waiting for data chunk " + (count + 1), chunk)
-      for(message <- chunk.messages)
-        count += 1
-      if(count == expected)
-        return
+      count += chunk.messages.size
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 3998a21..201fa87 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -152,14 +152,14 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
         response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
-        case e: OffsetOutOfRangeException => // This is good.
+        case _: OffsetOutOfRangeException => // This is good.
       }
     }
 
     {
       // send some invalid partitions
       val builder = new FetchRequestBuilder()
-      for((topic, partition) <- topics)
+      for((topic, _) <- topics)
         builder.addFetch(topic, -1, 0, 10000)
 
       try {
@@ -168,7 +168,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
         response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
-        case e: UnknownTopicOrPartitionException => // This is good.
+        case _: UnknownTopicOrPartitionException => // This is good.
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 83cce77..7d8e0c2 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -72,10 +72,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
     // call createMesssageStreams twice should throw MessageStreamsExistException
     try {
-      val topicMessageStreams2 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+      zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
       fail("Should fail with MessageStreamsExistException")
     } catch {
-      case e: MessageStreamsExistException => // expected
+      case _: MessageStreamsExistException => // expected
     }
     zkConsumerConnector1.shutdown
     info("all consumer connectors stopped")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 8702474..a7f0446 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -46,7 +46,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testFileSize() {
     assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
-    for(i <- 0 until 20) {
+    for (_ <- 0 until 20) {
       messageSet.append(singleMessageSet("abcd".getBytes))
       assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
     } 
@@ -66,9 +66,8 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   
   def testPartialWrite(size: Int, messageSet: FileMessageSet) {
     val buffer = ByteBuffer.allocate(size)
-    val originalPosition = messageSet.channel.position
-    for(i <- 0 until size)
-      buffer.put(0.asInstanceOf[Byte])
+    for (_ <- 0 until size)
+      buffer.put(0: Byte)
     buffer.rewind()
     messageSet.channel.write(buffer)
     // appending those bytes should not change the contents
@@ -195,7 +194,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
       msgSet.truncateTo(43)
       fail("Should throw KafkaException")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
 
     EasyMock.verify(channelMock)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 14d24f7..36c61d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -120,9 +120,9 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   }
 
   private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
-    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
-      val info = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
       counter += 1
       (key, count)
     }
@@ -185,4 +185,4 @@ object LogCleanerLagIntegrationTest {
       list.add(Array(codec.name))
     list
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 4fa73dc..6e5806f 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -173,15 +173,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     cleanerManager
   }
 
-  private def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
-    // append some messages to create some segments
-    for (i <- 0 until 100)
-      log.append(set)
-
-    // expire all segments
-    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
-  }
-
   private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 24a6366..d80fba1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -254,7 +254,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // create 6 segments with only one message in each segment
     val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
-    for (i <- 0 until 6)
+    for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
     val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
@@ -272,7 +272,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // create 6 segments with only one message in each segment
     val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
-    for (i <- 0 until 6)
+    for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
     // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
@@ -281,7 +281,6 @@ class LogCleanerTest extends JUnitSuite {
 
     val expectedCleanSize = segs.take(2).map(_.size).sum
     val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
-    val expectedUncleanableSize = segs.drop(4).map(_.size).sum
 
     assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty",
       logToClean.cleanBytes, expectedCleanSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 6d34c78..569264a 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -60,7 +60,7 @@ class LogConfigTest {
       case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
       case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
       case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
-      case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+      case _ => assertPropertyInvalid(name, "not_a_number", "-1")
     })
   }
 
@@ -105,8 +105,4 @@ class LogConfigTest {
     })
   }
 
-  private def randFrom[T](choices: T*): T = {
-    import scala.util.Random
-    choices(Random.nextInt(choices.size))
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 95a1cb5..5421da9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -88,8 +88,8 @@ class LogManagerTest {
   def testCleanupExpiredSegments() {
     val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
     var offset = 0L
-    for(i <- 0 until 200) {
-      var set = TestUtils.singleMessageSet("test".getBytes())
+    for(_ <- 0 until 200) {
+      val set = TestUtils.singleMessageSet("test".getBytes())
       val info = log.append(set)
       offset = info.lastOffset
     }
@@ -107,7 +107,7 @@ class LogManagerTest {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
     log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -134,7 +134,7 @@ class LogManagerTest {
 
     // add a bunch of messages that should be larger than the retentionSize
     val numMessages = 200
-    for(i <- 0 until numMessages) {
+    for (_ <- 0 until numMessages) {
       val set = TestUtils.singleMessageSet("test".getBytes())
       val info = log.append(set)
       offset = info.firstOffset
@@ -152,7 +152,7 @@ class LogManagerTest {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
     log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -168,8 +168,8 @@ class LogManagerTest {
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
     val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
     var offset = 0L
-    for(i <- 0 until 200) {
-      var set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
+    for (_ <- 0 until 200) {
+      val set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
       val info = log.append(set)
       offset = info.lastOffset
     }
@@ -197,8 +197,8 @@ class LogManagerTest {
     logManager.startup
     val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
-    for(i <- 0 until 200) {
-      var set = TestUtils.singleMessageSet("test".getBytes())
+    for (_ <- 0 until 200) {
+      val set = TestUtils.singleMessageSet("test".getBytes())
       log.append(set)
     }
     time.sleep(logManager.InitialTaskDelayMs)
@@ -215,7 +215,7 @@ class LogManagerTest {
                      TestUtils.tempDir(),
                      TestUtils.tempDir())
     logManager.shutdown()
-    logManager = createLogManager()
+    logManager = createLogManager(dirs)
 
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
@@ -235,7 +235,7 @@ class LogManagerTest {
       createLogManager()
       fail("Should not be able to create a second log manager instance with the same data directory")
     } catch {
-      case e: KafkaException => // this is good
+      case _: KafkaException => // this is good
     }
   }
 
@@ -279,7 +279,7 @@ class LogManagerTest {
                                        logManager: LogManager) {
     val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
     logs.foreach(log => {
-      for(i <- 0 until 50)
+      for (_ <- 0 until 50)
         log.append(TestUtils.singleMessageSet("test".getBytes()))
 
       log.flush()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 04d46de..7f78148 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -138,7 +138,7 @@ class LogSegmentTest {
   def testTruncate() {
     val seg = createSegment(40)
     var offset = 40
-    for(i <- 0 until 30) {
+    for (_ <- 0 until 30) {
       val ms1 = messages(offset, "hello")
       seg.append(offset, Message.NoTimestamp, -1L, ms1)
       val ms2 = messages(offset + 1, "hello")
@@ -160,7 +160,7 @@ class LogSegmentTest {
     val numMessages = 30
     val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
     var offset = 40
-    for (i <- 0 until numMessages) {
+    for (_ <- 0 until numMessages) {
       seg.append(offset, offset, offset, messages(offset, "hello"))
       offset += 1
     }
@@ -279,7 +279,7 @@ class LogSegmentTest {
   @Test
   def testRecoveryWithCorruptMessage() {
     val messagesAppended = 20
-    for(iteration <- 0 until 10) {
+    for (_ <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
         seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9ecb651..d18719a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -163,7 +163,7 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
-    for (i<- 1 to (msgPerSeg + 1)) {
+    for (_ <- 1 to (msgPerSeg + 1)) {
       log.append(set)
     }
     assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
@@ -326,14 +326,14 @@ class LogTest extends JUnitSuite {
       log.read(0, 1000)
       fail("Reading below the log start offset should throw OffsetOutOfRangeException")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
 
     try {
       log.read(1026, 1000)
       fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
 
     assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes)
@@ -448,7 +448,7 @@ class LogTest extends JUnitSuite {
       log.append(messageSet)
       fail("message set should throw RecordBatchTooLargeException.")
     } catch {
-      case e: RecordBatchTooLargeException => // this is good
+      case _: RecordBatchTooLargeException => // this is good
     }
   }
 
@@ -475,19 +475,19 @@ class LogTest extends JUnitSuite {
       log.append(messageSetWithUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: CorruptRecordException => // this is good
+      case _: CorruptRecordException => // this is good
     }
     try {
       log.append(messageSetWithOneUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: CorruptRecordException => // this is good
+      case _: CorruptRecordException => // this is good
     }
     try {
       log.append(messageSetWithCompressedUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: CorruptRecordException => // this is good
+      case _: CorruptRecordException => // this is good
     }
 
     // the following should succeed without any InvalidMessageException
@@ -518,7 +518,7 @@ class LogTest extends JUnitSuite {
       log.append(second)
       fail("Second message set should throw MessageSizeTooLargeException.")
     } catch {
-      case e: RecordTooLargeException => // this is good
+      case _: RecordTooLargeException => // this is good
     }
   }
   /**
@@ -725,7 +725,7 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
-    for (i<- 1 to msgPerSeg)
+    for (_ <- 1 to msgPerSeg)
       log.append(set)
 
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
@@ -746,7 +746,7 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change offset", 0, log.logEndOffset)
     assertEquals("Should change log size", 0, log.size)
 
-    for (i<- 1 to msgPerSeg)
+    for (_ <- 1 to msgPerSeg)
       log.append(set)
 
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
@@ -755,7 +755,7 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
     assertEquals("Should change log size", log.size, 0)
 
-    for (i<- 1 to msgPerSeg)
+    for (_ <- 1 to msgPerSeg)
       log.append(set)
 
     assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
@@ -831,7 +831,7 @@ class LogTest extends JUnitSuite {
     assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists)
 
     // check that we can append to the log
-    for(i <- 0 until 10)
+    for (_ <- 0 until 10)
       log.append(set)
 
     log.delete()
@@ -857,7 +857,7 @@ class LogTest extends JUnitSuite {
                       time)
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
     log.close()
     log = new Log(logDir,
@@ -892,7 +892,7 @@ class LogTest extends JUnitSuite {
                       time)
 
     // append some messages to create some segments
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     // files should be renamed
@@ -934,7 +934,7 @@ class LogTest extends JUnitSuite {
                       time)
 
     // append some messages to create some segments
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     // expire all segments
@@ -986,7 +986,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val set = TestUtils.singleMessageSet("test".getBytes)
     val recoveryPoint = 50L
-    for(iteration <- 0 until 50) {
+    for (_ <- 0 until 50) {
       // create a log and write some messages to it
       logDir.mkdirs()
       var log = new Log(logDir,
@@ -995,7 +995,7 @@ class LogTest extends JUnitSuite {
                         time.scheduler,
                         time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
-      for(i <- 0 until numMessages)
+      for (_ <- 0 until numMessages)
         log.append(set)
       val messages = log.logSegments.flatMap(_.log.iterator.toList)
       log.close()
@@ -1033,7 +1033,7 @@ class LogTest extends JUnitSuite {
       recoveryPoint = 0L,
       time.scheduler,
       time)
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
     log.close()
 
@@ -1062,7 +1062,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1073,7 +1073,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1086,7 +1086,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1099,7 +1099,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1112,7 +1112,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1134,7 +1134,7 @@ class LogTest extends JUnitSuite {
       time)
 
     // append some messages to create some segments
-    for (i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     // expire all segments
@@ -1143,7 +1143,7 @@ class LogTest extends JUnitSuite {
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
 
     // append some messages to create some segments
-    for (i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     log.delete()
@@ -1158,7 +1158,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments
@@ -1171,7 +1171,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments
@@ -1184,7 +1184,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionMs = 10000)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments()
@@ -1197,7 +1197,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionMs = 10000000)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments()
@@ -1212,7 +1212,7 @@ class LogTest extends JUnitSuite {
       cleanupPolicy = "compact")
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     // mark oldest segment as older the retention.ms
@@ -1231,7 +1231,7 @@ class LogTest extends JUnitSuite {
       cleanupPolicy = "compact,delete")
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 869e618..7618cf7 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -157,7 +157,7 @@ class OffsetIndexTest extends JUnitSuite {
     val rand = new Random(1L)
     val vals = new mutable.ArrayBuffer[Int](len)
     var last = base
-    for (i <- 0 until len) {
+    for (_ <- 0 until len) {
       last += rand.nextInt(15) + 1
       vals += last
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index a5bec17..b079f25 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -62,7 +62,6 @@ class OffsetMapTest extends JUnitSuite {
     val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
     for(i <- 0 until items)
       map.put(key(i), i)
-    var misses = 0
     for(i <- 0 until items)
       assertEquals(map.get(key(i)), i.toLong)
     map
@@ -85,4 +84,4 @@ object OffsetMapTest {
     println(map.size + " entries in map of size " + map.slots + " in " + ellapsedMs + " ms")
     println("Collision rate: %.1f%%".format(100*map.collisionRate))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 18a023c..e2cfb87 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -293,7 +293,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                 messageTimestampDiffMaxMs = 1000L)
       fail("Should throw InvalidMessageException.")
     } catch {
-      case e: InvalidTimestampException =>
+      case _: InvalidTimestampException =>
     }
 
     try {
@@ -306,7 +306,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                           messageTimestampDiffMaxMs = 1000L)
       fail("Should throw InvalidMessageException.")
     } catch {
-      case e: InvalidTimestampException =>
+      case _: InvalidTimestampException =>
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 1438523..e8abfe1 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -94,8 +94,7 @@ class MessageCompressionTest extends JUnitSuite {
       new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
       true
     } catch {
-      case e: UnsatisfiedLinkError => false
-      case e: org.xerial.snappy.SnappyError => false
+      case _: UnsatisfiedLinkError | _: org.xerial.snappy.SnappyError => false
     }
   }
 
@@ -104,7 +103,7 @@ class MessageCompressionTest extends JUnitSuite {
       new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
       true
     } catch {
-      case e: UnsatisfiedLinkError => false
+      case _: UnsatisfiedLinkError => false
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e60f350..da6f260 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -159,7 +159,7 @@ class SocketServerTest extends JUnitSuite {
       outgoing.flush()
       receiveResponse(socket)
     } catch {
-      case e: IOException => // thats fine
+      case _: IOException => // thats fine
     }
   }
 
@@ -186,14 +186,14 @@ class SocketServerTest extends JUnitSuite {
       sendRequest(plainSocket, largeChunkOfBytes, Some(0))
       fail("expected exception when writing to closed plain socket")
     } catch {
-      case e: IOException => // expected
+      case _: IOException => // expected
     }
 
     try {
       sendRequest(traceSocket, largeChunkOfBytes, Some(0))
       fail("expected exception when writing to closed trace socket")
     } catch {
-      case e: IOException => // expected
+      case _: IOException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 3088199..3093b93 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -76,7 +76,7 @@ class AsyncProducerTest {
       fail("Queue should be full")
     }
     catch {
-      case e: QueueFullException => //expected
+      case _: QueueFullException => //expected
     }finally {
       producer.close()
     }
@@ -96,7 +96,7 @@ class AsyncProducerTest {
       fail("should complain that producer is already closed")
     }
     catch {
-      case e: ProducerClosedException => //expected
+      case _: ProducerClosedException => //expected
     }
   }
 
@@ -266,7 +266,7 @@ class AsyncProducerTest {
     }
     catch {
       // should not throw any exception
-      case e: Throwable => fail("Should not throw any exception")
+      case _: Throwable => fail("Should not throw any exception")
 
     }
   }
@@ -298,7 +298,7 @@ class AsyncProducerTest {
       fail("Should fail with FailedToSendMessageException")
     }
     catch {
-      case e: FailedToSendMessageException => // we retry on any exception now
+      case _: FailedToSendMessageException => // we retry on any exception now
     }
   }
 
@@ -317,8 +317,8 @@ class AsyncProducerTest {
       producer.send(getProduceData(1): _*)
       fail("Should fail with ClassCastException due to incompatible Encoder")
     } catch {
-      case e: ClassCastException =>
-    }finally {
+      case _: ClassCastException =>
+    } finally {
       producer.close()
     }
   }
@@ -352,9 +352,9 @@ class AsyncProducerTest {
     val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
     partitionedDataOpt match {
       case Some(partitionedData) =>
-        for ((brokerId, dataPerBroker) <- partitionedData) {
-          for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker)
-            assertTrue(partitionId == 0)
+        for (dataPerBroker <- partitionedData.values) {
+          for (tp <- dataPerBroker.keys)
+            assertTrue(tp.partition == 0)
         }
       case None =>
         fail("Failed to collate requests by topic, partition")
@@ -461,7 +461,7 @@ class AsyncProducerTest {
       fail("should complain about wrong config")
     }
     catch {
-      case e: IllegalArgumentException => //expected
+      case _: IllegalArgumentException => //expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dc73db3..f4a339e 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -115,15 +115,12 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
         keyEncoder = classOf[StringEncoder].getName,
         producerProps = props)
 
-    try{
+    try {
       producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
       fail("Test should fail because the broker list provided are not valid")
     } catch {
-      case e: FailedToSendMessageException => // this is expected
-      case oe: Throwable => fail("fails with exception", oe)
-    } finally {
-      producer1.close()
-    }
+      case _: FailedToSendMessageException => // this is expected
+    } finally producer1.close()
 
     val producer2 = TestUtils.createProducer[String, String](
       brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)),
@@ -216,9 +213,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
         fail("we don't support request.required.acks greater than 1")
     }
     catch {
-      case iae: IllegalArgumentException =>  // this is expected
-      case e: Throwable => fail("Not expected", e)
-
+      case _: IllegalArgumentException =>  // this is expected
     }
   }
 
@@ -261,7 +256,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       fail("Should fail since no leader exists for the partition.")
     } catch {
       case e : TestFailedException => throw e // catch and re-throw the failure message
-      case e2: Throwable => // otherwise success
+      case _: Throwable => // otherwise success
     }
 
     // restart server 1
@@ -290,6 +285,10 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
   @Test
   def testAsyncSendCanCorrectlyFailWithTimeout() {
+    val topic = "new-topic"
+    // create topics in ZK
+    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+
     val timeoutMs = 500
     val props = new Properties()
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
@@ -303,10 +302,6 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       partitioner = classOf[StaticPartitioner].getName,
       producerProps = props)
 
-    val topic = "new-topic"
-    // create topics in ZK
-    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-
     // do a simple test to make sure plumbing is okay
     try {
       // this message should be assigned to partition 0 whose leader is on broker 0
@@ -315,30 +310,25 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       val messageSet1 = response1.messageSet("new-topic", 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.next.message)
-    } catch {
-      case e: Throwable => case e: Exception => producer.close; fail("Not expected", e)
-    }
-
-    // stop IO threads and request handling, but leave networking operational
-    // any requests should be accepted and queue up, but not handled
-    server1.requestHandlerPool.shutdown()
-
-    val t1 = SystemTime.milliseconds
-    try {
-      // this message should be assigned to partition 0 whose leader is on broker 0, but
-      // broker 0 will not response within timeoutMs millis.
-      producer.send(new KeyedMessage[String, String](topic, "test", "test"))
-    } catch {
-      case e: FailedToSendMessageException => /* success */
-      case e: Exception => fail("Not expected", e)
-    } finally {
-      producer.close()
-    }
-    val t2 = SystemTime.milliseconds
-
-    // make sure we don't wait fewer than timeoutMs
-    assertTrue((t2-t1) >= timeoutMs)
+      assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload)
+
+      // stop IO threads and request handling, but leave networking operational
+      // any requests should be accepted and queue up, but not handled
+      server1.requestHandlerPool.shutdown()
+
+      val t1 = SystemTime.milliseconds
+      try {
+        // this message should be assigned to partition 0 whose leader is on broker 0, but
+        // broker 0 will not response within timeoutMs millis.
+        producer.send(new KeyedMessage[String, String](topic, "test", "test"))
+      } catch {
+        case _: FailedToSendMessageException => /* success */
+      }
+      val t2 = SystemTime.milliseconds
+      // make sure we don't wait fewer than timeoutMs
+      assertTrue((t2-t1) >= timeoutMs)
+
+    } finally producer.close()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 968ea4b..7e72eec 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -151,8 +151,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
       producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
     } catch {
-      case e : java.io.IOException => // success
-      case e2: Throwable => throw e2
+      case _ : java.io.IOException => // success
     }
   }
 
@@ -222,8 +221,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
       producer.send(request)
       fail("Should have received timeout exception since request handling is stopped.")
     } catch {
-      case e: SocketTimeoutException => /* success */
-      case e: Throwable => fail("Unexpected exception when expecting timeout: " + e)
+      case _: SocketTimeoutException => /* success */
     }
     val t2 = SystemTime.milliseconds
     // make sure we don't wait fewer than timeoutMs for a response

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
index 0a28e8f..1df34ea 100644
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
@@ -31,7 +31,7 @@ class OperationTest extends JUnitSuite {
       Operation.fromString("badName")
       fail("Expected exception on invalid operation name.")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
index 6cd1ad7..1b1c864 100644
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
@@ -31,7 +31,7 @@ class PermissionTypeTest extends JUnitSuite {
       PermissionType.fromString("badName")
       fail("Expected exception on invalid PermissionType name.")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
index 3653a7d..546c92e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
@@ -31,7 +31,7 @@ class ResourceTypeTest extends JUnitSuite {
       ResourceType.fromString("badName")
       fail("Expected exception on invalid ResourceType name.")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1f52af4..0765992 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -347,7 +347,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
 
     // Alternate authorizer to keep adding and removing zookeeper path
-    val concurrentFuctions = (0 to 50).map { i =>
+    val concurrentFuctions = (0 to 50).map { _ =>
       () => {
         simpleAclAuthorizer.addAcls(Set(acl), resource)
         simpleAclAuthorizer2.removeAcls(Set(acl), resource)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 9203130..8cec0c7 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -65,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
       JaasUtils.isZkSecurityEnabled()
       fail("Should have thrown an exception")
     } catch {
-      case e: KafkaException => // Expected
+      case _: KafkaException => // Expected
     }
   }
 
@@ -286,7 +286,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     
     // Fail the test if able to delete
     result match {
-      case Success(v) => // All done
+      case Success(_) => // All done
       case Failure(e) => fail(e.getMessage)
     }
   }
@@ -302,7 +302,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
         case "/" => deleteRecursive(zkUtils, s"/$child")
         case path => deleteRecursive(zkUtils, s"$path/$child")
       }) match {
-        case Success(v) => result
+        case Success(_) => result
         case Failure(e) => Failure(e)
       }
     path match {
@@ -314,7 +314,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
           zkUtils.deletePath(path)
           Failure(new Exception(s"Have been able to delete $path"))
         } catch {
-          case e: Exception => result
+          case _: Exception => result
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 291e822..b581341 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -229,7 +229,7 @@ class ClientQuotaManagerTest {
       /* We have 10 second windows. Make sure that there is no quota violation
        * if we produce under the quota
        */
-      for (i <- 0 until 10) {
+      for (_ <- 0 until 10) {
         clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
         time.sleep(1000)
       }
@@ -256,7 +256,7 @@ class ClientQuotaManagerTest {
       assertEquals(11, numCallbacks)
 
       // Could continue to see delays until the bursty sample disappears
-      for (i <- 0 until 10) {
+      for (_ <- 0 until 10) {
         clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
         time.sleep(1000)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index faa23f0..2e50d30 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -168,7 +168,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
       fail("Should fail with AdminOperationException for topic doesn't exist")
     } catch {
-      case e: AdminOperationException => // expected
+      case _: AdminOperationException => // expected
     }
   }
 
@@ -198,7 +198,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
-      case t: Throwable =>
+      case _: Throwable =>
     }
     // Version is provided. EntityType is incorrect
     try {
@@ -207,7 +207,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
-      case t: Throwable =>
+      case _: Throwable =>
     }
 
     // EntityName isn't provided
@@ -217,7 +217,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
-      case t: Throwable =>
+      case _: Throwable =>
     }
 
     // Everything is provided

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 540a665..0051247 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.message.MessageSet
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 
-
 class IsrExpirationTest {
 
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
@@ -52,7 +51,8 @@ class IsrExpirationTest {
 
   @Before
   def setUp() {
-    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
+    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false),
+      QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
   }
 
   @After
@@ -66,7 +66,7 @@ class IsrExpirationTest {
    */
   @Test
   def testIsrExpirationForStuckFollowers() {
-    val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
+    val log = logMock
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -75,8 +75,7 @@ class IsrExpirationTest {
 
     // let the follower catch up to the Leader logEndOffset (15)
     (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
-                                                                 MessageSet.Empty),
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty),
                                                    -1L,
                                                    -1,
                                                    true)))
@@ -97,7 +96,7 @@ class IsrExpirationTest {
    */
   @Test
   def testIsrExpirationIfNoFetchRequestMade() {
-    val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
+    val log = logMock
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -119,7 +118,7 @@ class IsrExpirationTest {
   @Test
   def testIsrExpirationForSlowFollowers() {
     // create leader replica
-    val log = getLogWithLogEndOffset(15L, 4)
+    val log = logMock
     // add one partition
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
@@ -158,7 +157,7 @@ class IsrExpirationTest {
 
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
                                                localLog: Log): Partition = {
-    val leaderId=config.brokerId
+    val leaderId = config.brokerId
     val partition = replicaManager.getOrCreatePartition(topic, partitionId)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
@@ -171,12 +170,10 @@ class IsrExpirationTest {
     partition
   }
 
-  private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
-    val log1 = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls)
-    EasyMock.replay(log1)
-
-    log1
+  private def logMock: Log = {
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.replay(log)
+    log
   }
 
   private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 042dabf..93d0413 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -112,7 +112,7 @@ class KafkaConfigTest {
     props5.put("log.retention.ms", "0")
 
     intercept[IllegalArgumentException] {
-      val cfg5 = KafkaConfig.fromProps(props5)
+      KafkaConfig.fromProps(props5)
     }
   }
 
@@ -127,13 +127,13 @@ class KafkaConfigTest {
     props3.put("log.retention.hours", "0")
 
     intercept[IllegalArgumentException] {
-      val cfg1 = KafkaConfig.fromProps(props1)
+      KafkaConfig.fromProps(props1)
     }
     intercept[IllegalArgumentException] {
-      val cfg2 = KafkaConfig.fromProps(props2)
+      KafkaConfig.fromProps(props2)
     }
     intercept[IllegalArgumentException] {
-      val cfg3 = KafkaConfig.fromProps(props3)
+      KafkaConfig.fromProps(props3)
     }
 
   }
@@ -303,7 +303,7 @@ class KafkaConfigTest {
       KafkaConfig.fromProps(props)
       true
     } catch {
-      case e: IllegalArgumentException => false
+      case _: IllegalArgumentException => false
     }
   }
 
@@ -561,8 +561,7 @@ class KafkaConfigTest {
         case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
         case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
         case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
-
-        case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+        case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 0885709..0f3ee63 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -90,7 +90,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val log = logManager.getLog(TopicAndPartition(topic, part)).get
 
     val message = new Message(Integer.toString(42).getBytes())
-    for(i <- 0 until 20)
+    for (_ <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
@@ -125,7 +125,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     var offsetChanged = false
-    for(i <- 1 to 14) {
+    for (_ <- 1 to 14) {
       val topicAndPartition = TopicAndPartition(topic, 0)
       val offsetRequest =
         OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
@@ -151,7 +151,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
     val message = new Message(Integer.toString(42).getBytes())
-    for(i <- 0 until 20)
+    for (_ <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
@@ -180,7 +180,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
     val message = new Message(Integer.toString(42).getBytes())
-    for(i <- 0 until 20)
+    for (_ <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index b34c93d..138c36d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -267,7 +267,7 @@ class MetadataCacheTest {
       fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead")
     }
     catch {
-      case e: BrokerEndPointNotAvailableException => //expected
+      case _: BrokerEndPointNotAvailableException => //expected
     }
 
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 29eaf2d..64c67d6 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -296,7 +296,6 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     createTopic(zkUtils, topic2, servers = Seq(server), numPartitions = 1)
 
     // Commit an offset
-    val expectedReplicaAssignment = Map(0  -> List(1))
     val commitRequest = OffsetCommitRequest(group, immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L),
       TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 825b2b4..3a09737 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -120,7 +120,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Add data equally to each partition
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
-    (0 until msgCount).foreach { x =>
+    (0 until msgCount).foreach { _ =>
       (0 to 7).foreach { partition =>
         producer.send(new ProducerRecord(topic, partition, null, msg))
       }
@@ -215,7 +215,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
   def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 0)
-    (0 until msgCount).foreach { x => producer.send(new ProducerRecord(topic, msg)).get }
+    (0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
     waitForOffsetsToMatch(msgCount, 0, 100)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4e4fb95..9e19e39 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
         sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
         fail("Versions Request during Sasl handshake did not fail")
       } catch {
-        case ioe: IOException => // expected exception
+        case _: IOException => // expected exception
       }
     } finally {
       plaintextSocket.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index e0b6db4..f21f2de 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -130,7 +130,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     try {
       server1.startup()
     } catch {
-      case e: kafka.common.InconsistentBrokerIdException => //success
+      case _: kafka.common.InconsistentBrokerIdException => //success
     }
     server1.shutdown()
     CoreUtils.delete(server1.config.logDirs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index bc71edd..fd0a460 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -129,16 +129,14 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
     try {
       server.startup()
-      fail("Expected KafkaServer setup to fail, throw exception")
+      fail("Expected KafkaServer setup to fail and throw exception")
     }
     catch {
       // Try to clean up carefully without hanging even if the test fails. This means trying to accurately
       // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
       // goes wrong so that awaitShutdown doesn't hang
-      case e: org.I0Itec.zkclient.exception.ZkException =>
+      case _: org.I0Itec.zkclient.exception.ZkException =>
         assertEquals(NotRunning.state, server.brokerState.currentState)
-      case e: Throwable =>
-        fail("Expected ZkException during Kafka server starting up but caught a different exception %s".format(e.toString))
     }
     finally {
       if (server.brokerState.currentState != NotRunning.state)
@@ -170,7 +168,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
       assertTrue(true)
     }
     catch{
-      case ex: Throwable => fail()
+      case _: Throwable => fail()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index b5560c3..86b167e 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -58,7 +58,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
       TestUtils.createServer(KafkaConfig.fromProps(props2))
       fail("Registering a broker with a conflicting id should fail")
     } catch {
-      case e : RuntimeException =>
+      case _: RuntimeException =>
       // this is expected
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index 741eec9..da80c0d 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -62,7 +62,7 @@ class ConsoleProducerTest {
       new ConsoleProducer.ProducerConfig(invalidArgs)
       Assert.fail("Should have thrown an UnrecognizedOptionException")
     } catch {
-      case e: joptsimple.OptionException => // expected exception
+      case _: joptsimple.OptionException => // expected exception
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3796e48..83fd6b8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -822,14 +822,14 @@ object TestUtils extends Logging {
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
     val file = new RandomAccessFile(fileName, "rw")
     file.seek(position)
-    for(i <- 0 until size)
+    for (_ <- 0 until size)
       file.writeByte(random.nextInt(255))
     file.close()
   }
 
   def appendNonsenseToFile(fileName: File, size: Int) {
     val file = new FileOutputStream(fileName, true)
-    for(i <- 0 until size)
+    for (_ <- 0 until size)
       file.write(random.nextInt(255))
     file.close()
   }
@@ -984,7 +984,7 @@ object TestUtils extends Logging {
 
     var messages: List[String] = Nil
     val shouldGetAllMessages = nMessagesPerThread < 0
-    for ((topic, messageStreams) <- topicMessageStreams) {
+    for (messageStreams <- topicMessageStreams.values) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator()
         try {
@@ -1117,7 +1117,7 @@ object TestUtils extends Logging {
           }
       }
     } catch {
-      case ie: InterruptedException => failWithTimeout()
+      case _: InterruptedException => failWithTimeout()
       case e: Throwable => exceptions += e
     } finally {
       threadPool.shutdownNow()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
index 29c9067..64129e9 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
@@ -35,8 +35,6 @@ class TimerTaskListTest {
   @Test
   def testAll() {
     val sharedCounter = new AtomicInteger(0)
-    val runCounter = new AtomicInteger(0)
-    val execCounter = new AtomicInteger(0)
     val list1 = new TimerTaskList(sharedCounter)
     val list2 = new TimerTaskList(sharedCounter)
     val list3 = new TimerTaskList(sharedCounter)
@@ -46,7 +44,7 @@ class TimerTaskListTest {
       list1.add(new TimerTaskEntry(task, 10L))
       assertEquals(i, sharedCounter.get)
       task
-    }.toSeq
+    }
 
     assertEquals(tasks.size, sharedCounter.get)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index c2c25ed..4d57ed9 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -80,7 +80,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     try {
       zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
     } catch {                       
-      case e: Exception =>
+      case _: Exception =>
     }
 
     var testData: String = null
@@ -147,7 +147,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
         zwe.create()
         false
       } catch {
-        case e: ZkNodeExistsException => true
+        case _: ZkNodeExistsException => true
       }
     Assert.assertTrue(gotException)
     zkClient2.close()
@@ -155,7 +155,6 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
   
   /**
    * Tests if succeeds with znode from the same session
-   * 
    */
   @Test
   def testSameSession = {
@@ -171,7 +170,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
         zwe.create()
         false
       } catch {
-        case e: ZkNodeExistsException => true
+        case _: ZkNodeExistsException => true
       }
     Assert.assertFalse(gotException)
   }


[4/4] kafka git commit: MINOR: A bunch of clean-ups related to usage of unused variables

Posted by ij...@apache.org.
MINOR: A bunch of clean-ups related to usage of unused variables

There should be only one cases where these clean-ups have a functional impact: replaced repeated identical logs with a single log for the stale controller epoch case.

The rest should just make the code easier to read and make it a bit less wasteful. I did this exercise because unused variables sometimes mask bugs.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #1985 from ijuma/remove-unused


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0926738
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0926738
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0926738

Branch: refs/heads/trunk
Commit: d092673838173d9dedbf5acf3f4e2cd8c736294f
Parents: 1fc450f
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Oct 25 02:55:55 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Oct 25 02:55:55 2016 +0100

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  2 -
 .../consumer/internals/ConsumerCoordinator.java |  2 -
 .../consumer/internals/SubscriptionState.java   |  7 --
 .../kafka/clients/producer/KafkaProducer.java   |  1 -
 .../clients/producer/internals/Sender.java      | 10 +--
 .../apache/kafka/common/network/MultiSend.java  |  6 +-
 .../common/network/PlaintextChannelBuilder.java |  4 +-
 .../kafka/common/network/SslChannelBuilder.java |  4 +-
 .../kafka/common/network/SslTransportLayer.java |  5 +-
 .../authenticator/SaslServerAuthenticator.java  |  5 +-
 .../apache/kafka/clients/NetworkClientTest.java |  3 +-
 .../producer/internals/BufferPoolTest.java      |  2 +-
 .../clients/producer/internals/SenderTest.java  |  3 -
 .../apache/kafka/connect/data/Timestamp.java    |  4 --
 .../runtime/distributed/WorkerCoordinator.java  |  5 +-
 .../runtime/rest/entities/ConnectorInfo.java    |  9 ---
 .../src/main/scala/kafka/admin/AclCommand.scala |  1 -
 .../src/main/scala/kafka/admin/AdminUtils.scala |  6 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  | 10 +--
 .../kafka/admin/ConsumerGroupCommand.scala      |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala |  2 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |  4 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  4 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  |  1 -
 .../kafka/api/ControlledShutdownResponse.scala  |  2 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |  7 +-
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |  4 +-
 .../scala/kafka/api/OffsetCommitResponse.scala  |  2 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |  5 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |  5 +-
 .../main/scala/kafka/client/ClientUtils.scala   |  6 +-
 .../scala/kafka/cluster/BrokerEndPoint.scala    |  2 +-
 .../main/scala/kafka/cluster/Partition.scala    |  3 +-
 core/src/main/scala/kafka/cluster/Replica.scala |  7 +-
 .../ZkNodeChangeNotificationListener.scala      |  2 +-
 .../kafka/consumer/PartitionAssignor.scala      |  4 +-
 .../main/scala/kafka/consumer/TopicCount.scala  |  4 +-
 .../main/scala/kafka/consumer/TopicFilter.scala |  2 +-
 .../consumer/ZookeeperConsumerConnector.scala   | 24 +++----
 .../kafka/controller/KafkaController.scala      | 74 +++++++++-----------
 .../controller/PartitionStateMachine.scala      | 12 ++--
 .../kafka/controller/ReplicaStateMachine.scala  |  2 +-
 .../kafka/controller/TopicDeletionManager.scala |  4 +-
 .../coordinator/GroupMetadataManager.scala      |  2 +-
 core/src/main/scala/kafka/log/Log.scala         |  4 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  2 +-
 .../scala/kafka/log/LogCleanerManager.scala     | 10 ++-
 .../kafka/message/ByteBufferMessageSet.scala    |  2 +-
 .../scala/kafka/metrics/KafkaMetricsGroup.scala | 19 ++---
 .../scala/kafka/network/BlockingChannel.scala   |  2 +-
 .../kafka/producer/DefaultPartitioner.scala     |  2 -
 .../main/scala/kafka/producer/Producer.scala    |  2 +-
 .../scala/kafka/security/auth/Resource.scala    |  2 +-
 .../security/auth/SimpleAclAuthorizer.scala     |  4 +-
 .../kafka/server/AbstractFetcherManager.scala   |  2 +-
 .../main/scala/kafka/server/AdminManager.scala  |  2 +-
 .../kafka/server/BrokerMetadataCheckpoint.scala |  2 +-
 .../scala/kafka/server/ClientQuotaManager.scala |  8 +--
 .../main/scala/kafka/server/DelayedFetch.scala  |  4 +-
 .../kafka/server/DynamicConfigManager.scala     |  3 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |  1 -
 .../scala/kafka/server/OffsetCheckpoint.scala   |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     | 21 +++---
 .../kafka/server/ZookeeperLeaderElector.scala   |  2 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |  6 +-
 .../scala/kafka/tools/ConsumerPerformance.scala |  2 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 10 ++-
 .../scala/kafka/tools/EndToEndLatency.scala     |  3 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    | 10 +--
 .../scala/kafka/tools/ReplayLogProducer.scala   |  5 +-
 .../kafka/tools/ReplicaVerificationTool.scala   | 21 +++---
 core/src/main/scala/kafka/utils/CoreUtils.scala |  4 +-
 core/src/main/scala/kafka/utils/FileLock.scala  |  4 +-
 .../src/main/scala/kafka/utils/Mx4jLoader.scala |  6 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |  4 +-
 .../kafka/utils/VerifiableProperties.scala      |  4 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 30 ++++----
 .../integration/kafka/api/AdminClientTest.scala |  1 -
 .../kafka/api/AuthorizerIntegrationTest.scala   | 15 ++--
 .../kafka/api/BaseConsumerTest.scala            |  2 +-
 .../kafka/api/BaseProducerSendTest.scala        | 21 +++---
 .../kafka/api/EndToEndAuthorizationTest.scala   |  2 +-
 .../kafka/api/FixedPortTestUtils.scala          |  2 +-
 .../kafka/api/IntegrationTestHarness.scala      |  4 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 13 ++--
 .../kafka/api/PlaintextProducerSendTest.scala   |  4 +-
 .../kafka/api/ProducerBounceTest.scala          |  4 +-
 .../scala/kafka/security/minikdc/MiniKdc.scala  |  5 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |  2 +-
 .../test/scala/other/kafka/StressTestLog.scala  |  3 +-
 .../scala/other/kafka/TestCrcPerformance.scala  | 14 ++--
 .../scala/other/kafka/TestKafkaAppender.scala   |  9 +--
 .../other/kafka/TestLinearWriteSpeed.scala      |  3 +-
 .../scala/other/kafka/TestOffsetManager.scala   |  6 +-
 .../other/kafka/TestPurgatoryPerformance.scala  |  2 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  6 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |  4 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |  7 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala |  4 +-
 .../admin/ReassignPartitionsCommandTest.scala   |  2 +-
 .../scala/unit/kafka/api/ApiUtilsTest.scala     | 12 ++--
 .../api/RequestResponseSerializationTest.scala  | 36 ----------
 .../scala/unit/kafka/common/ConfigTest.scala    |  8 +--
 .../scala/unit/kafka/common/TopicTest.scala     |  6 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   | 13 ++--
 .../kafka/consumer/PartitionAssignorTest.scala  | 26 +++----
 .../ZookeeperConsumerConnectorTest.scala        | 19 +++--
 .../controller/ControllerFailoverTest.scala     |  4 +-
 .../coordinator/GroupMetadataManagerTest.scala  |  4 +-
 .../kafka/coordinator/GroupMetadataTest.scala   |  2 +-
 .../kafka/integration/AutoOffsetResetTest.scala |  6 +-
 .../unit/kafka/integration/FetcherTest.scala    |  9 +--
 .../kafka/integration/PrimitiveApiTest.scala    |  6 +-
 .../ZookeeperConsumerConnectorTest.scala        |  4 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |  9 ++-
 .../log/LogCleanerLagIntegrationTest.scala      |  6 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  9 ---
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  5 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    |  6 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   | 24 +++----
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  6 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 62 ++++++++--------
 .../scala/unit/kafka/log/OffsetIndexTest.scala  |  2 +-
 .../scala/unit/kafka/log/OffsetMapTest.scala    |  3 +-
 .../message/ByteBufferMessageSetTest.scala      |  4 +-
 .../kafka/message/MessageCompressionTest.scala  |  5 +-
 .../unit/kafka/network/SocketServerTest.scala   |  6 +-
 .../unit/kafka/producer/AsyncProducerTest.scala | 20 +++---
 .../unit/kafka/producer/ProducerTest.scala      | 66 ++++++++---------
 .../unit/kafka/producer/SyncProducerTest.scala  |  6 +-
 .../kafka/security/auth/OperationTest.scala     |  2 +-
 .../security/auth/PermissionTypeTest.scala      |  2 +-
 .../kafka/security/auth/ResourceTypeTest.scala  |  2 +-
 .../security/auth/SimpleAclAuthorizerTest.scala |  2 +-
 .../security/auth/ZkAuthorizationTest.scala     |  8 +--
 .../kafka/server/ClientQuotaManagerTest.scala   |  4 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  8 +--
 .../unit/kafka/server/ISRExpirationTest.scala   | 25 +++----
 .../unit/kafka/server/KafkaConfigTest.scala     | 13 ++--
 .../scala/unit/kafka/server/LogOffsetTest.scala |  8 +--
 .../unit/kafka/server/MetadataCacheTest.scala   |  2 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  1 -
 .../kafka/server/ReplicationQuotasTest.scala    |  4 +-
 .../server/SaslApiVersionsRequestTest.scala     |  2 +-
 .../server/ServerGenerateBrokerIdTest.scala     |  2 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |  8 +--
 .../unit/kafka/server/ServerStartupTest.scala   |  2 +-
 .../unit/kafka/tools/ConsoleProducerTest.scala  |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  8 +--
 .../kafka/utils/timer/TimerTaskListTest.scala   |  4 +-
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  7 +-
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   | 50 ++++---------
 .../scala/unit/kafka/zk/ZkFourLetterWords.scala |  2 +-
 .../internals/RocksDBKeyValueStoreTest.java     |  2 +-
 155 files changed, 477 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 73543ad..59319ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -749,7 +749,6 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private class GroupCoordinatorMetrics {
-        public final Metrics metrics;
         public final String metricGrpName;
 
         public final Sensor heartbeatLatency;
@@ -757,7 +756,6 @@ public abstract class AbstractCoordinator implements Closeable {
         public final Sensor syncLatency;
 
         public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
-            this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             this.heartbeatLatency = metrics.sensor("heartbeat-latency");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bd95409..a8d94fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,13 +757,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     private class ConsumerCoordinatorMetrics {
-        public final Metrics metrics;
         public final String metricGrpName;
 
         public final Sensor commitLatency;
 
         public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
-            this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             this.commitLatency = metrics.sensor("commit-latency");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6dc2060..003d1a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -65,9 +65,6 @@ public class SubscriptionState {
     /* the list of topics the user has requested */
     private Set<String> subscription;
 
-    /* the list of partitions the user has requested */
-    private Set<TopicPartition> userAssignment;
-
     /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
     private final Set<String> groupSubscription;
 
@@ -86,7 +83,6 @@ public class SubscriptionState {
     public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = Collections.emptySet();
-        this.userAssignment = Collections.emptySet();
         this.assignment = new PartitionStates<>();
         this.groupSubscription = new HashSet<>();
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
@@ -160,8 +156,6 @@ public class SubscriptionState {
         setSubscriptionType(SubscriptionType.USER_ASSIGNED);
 
         if (!this.assignment.partitionSet().equals(partitions)) {
-            this.userAssignment = partitions;
-
             Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
             for (TopicPartition partition : partitions) {
                 TopicPartitionState state = assignment.stateValue(partition);
@@ -218,7 +212,6 @@ public class SubscriptionState {
 
     public void unsubscribe() {
         this.subscription = Collections.emptySet();
-        this.userAssignment = Collections.emptySet();
         this.assignment.clear();
         this.subscribedPattern = null;
         this.subscriptionType = SubscriptionType.NONE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3632384..489c762 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -318,7 +318,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.RETRIES_CONFIG),
                     this.metrics,
                     new SystemTime(),
-                    clientId,
                     this.requestTimeoutMs);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8fc7f2c..c71bb67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -92,9 +92,6 @@ public class Sender implements Runnable {
     /* metrics */
     private final SenderMetrics sensors;
 
-    /* param clientId of the client */
-    private String clientId;
-
     /* the max time to wait for the server to respond to the request*/
     private final int requestTimeout;
 
@@ -107,7 +104,6 @@ public class Sender implements Runnable {
                   int retries,
                   Metrics metrics,
                   Time time,
-                  String clientId,
                   int requestTimeout) {
         this.client = client;
         this.accumulator = accumulator;
@@ -118,7 +114,6 @@ public class Sender implements Runnable {
         this.acks = acks;
         this.retries = retries;
         this.time = time;
-        this.clientId = clientId;
         this.sensors = new SenderMetrics(metrics);
         this.requestTimeout = requestTimeout;
     }
@@ -281,8 +276,7 @@ public class Sender implements Runnable {
                     completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
                 }
                 this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
-                this.sensors.recordThrottleTime(response.request().request().destination(),
-                                                produceResponse.getThrottleTime());
+                this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
             } else {
                 // this is the acks = 0 case, just complete all requests
                 for (RecordBatch batch : batches.values())
@@ -564,7 +558,7 @@ public class Sender implements Runnable {
             }
         }
 
-        public void recordThrottleTime(String node, long throttleTimeMs) {
+        public void recordThrottleTime(long throttleTimeMs) {
             this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
index 0e14a39..11f5e07 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
@@ -34,7 +34,6 @@ public class MultiSend implements Send {
     private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
     private String dest;
     private long totalWritten = 0;
-    private List<Send> sends;
     private Iterator<Send> sendsIterator;
     private Send current;
     private boolean doneSends = false;
@@ -42,7 +41,6 @@ public class MultiSend implements Send {
 
     public MultiSend(String dest, List<Send> sends) {
         this.dest = dest;
-        this.sends = sends;
         this.sendsIterator = sends.iterator();
         nextSendOrDone();
         for (Send send: sends)
@@ -76,7 +74,7 @@ public class MultiSend implements Send {
             throw new KafkaException("This operation cannot be completed on a complete request.");
 
         int totalWrittenPerCall = 0;
-        boolean sendComplete = false;
+        boolean sendComplete;
         do {
             long written = current.writeTo(channel);
             totalWritten += written;
@@ -97,4 +95,4 @@ public class MultiSend implements Send {
         else
             doneSends = true;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index f0af935..c573672 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -36,17 +36,15 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
     }
 
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
-        KafkaChannel channel = null;
         try {
             PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
         } catch (Exception e) {
             log.warn("Failed to create channel due to ", e);
             throw new KafkaException(e);
         }
-        return channel;
     }
 
     public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b546174..1d612bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -46,17 +46,15 @@ public class SslChannelBuilder implements ChannelBuilder {
     }
 
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
-        KafkaChannel channel = null;
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
             throw new KafkaException(e);
         }
-        return channel;
     }
 
     public void close()  {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index f8926f3..7ce59f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -397,12 +397,11 @@ public class SslTransportLayer implements TransportLayer {
     private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
         log.trace("SSLHandshake handshakeUnwrap {}", channelId);
         SSLEngineResult result;
-        boolean cont = false;
-        int read = 0;
         if (doRead)  {
-            read = socketChannel.read(netReadBuffer);
+            int read = socketChannel.read(netReadBuffer);
             if (read == -1) throw new EOFException("EOF during handshake.");
         }
+        boolean cont;
         do {
             //prepare the buffer with the incoming data
             netReadBuffer.flip();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index e1074a1..206fe39 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -63,7 +63,6 @@ import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractRequestResponse;
-import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
@@ -310,7 +309,7 @@ public class SaslServerAuthenticator implements Authenticator {
                 LOG.debug("Handle Kafka request {}", apiKey);
                 switch (apiKey) {
                     case API_VERSIONS:
-                        handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+                        handleApiVersionsRequest(requestHeader);
                         break;
                     case SASL_HANDSHAKE:
                         clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
@@ -361,7 +360,7 @@ public class SaslServerAuthenticator implements Authenticator {
         }
     }
 
-    private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest versionRequest) throws IOException, UnsupportedSaslMechanismException {
+    private void handleApiVersionsRequest(RequestHeader requestHeader) throws IOException, UnsupportedSaslMechanismException {
         sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index b556240..d305e8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -153,14 +153,13 @@ public class NetworkClientTest {
 
     @Test
     public void testLeastLoadedNode() {
-        Node leastNode = null;
         client.ready(node, time.milliseconds());
         awaitReady(client, node);
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
         
         // leastloadednode should be our single node
-        leastNode = client.leastLoadedNode(time.milliseconds());
+        Node leastNode = client.leastLoadedNode(time.milliseconds());
         assertEquals("There should be one leastloadednode", leastNode.id(), node.id());
         
         // sleep for longer than reconnect backoff

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 48682b1..3756d8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -88,7 +88,7 @@ public class BufferPoolTest {
         ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
-        buffer = pool.allocate(1025, maxBlockTimeMs);
+        pool.allocate(1025, maxBlockTimeMs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b7f9e74..b7645dd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -83,7 +83,6 @@ public class SenderTest {
                             MAX_RETRIES,
                             metrics,
                             time,
-                            CLIENT_ID,
                             REQUEST_TIMEOUT);
 
         metadata.update(cluster, time.milliseconds());
@@ -143,7 +142,6 @@ public class SenderTest {
                                        maxRetries,
                                        m,
                                        time,
-                                       "clientId",
                                        REQUEST_TIMEOUT);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
@@ -196,7 +194,6 @@ public class SenderTest {
                 maxRetries,
                 m,
                 time,
-                "clientId",
                 REQUEST_TIMEOUT);
 
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
index c447f6d..cd7ed4a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.data;
 
 import org.apache.kafka.connect.errors.DataException;
 
-import java.util.TimeZone;
-
 /**
  * <p>
  *     A timestamp representing an absolute time, without timezone information. The corresponding Java type is a
@@ -30,8 +28,6 @@ import java.util.TimeZone;
 public class Timestamp {
     public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Timestamp";
 
-    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
     /**
      * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such
      * as required/optional, default value, and documentation.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 8a065f1..88a0a8d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -52,7 +52,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     private final String restUrl;
     private final ConfigBackingStore configStorage;
     private ConnectProtocol.Assignment assignmentSnapshot;
-    private final WorkerCoordinatorMetrics sensors;
     private ClusterConfigState configSnapshot;
     private final WorkerRebalanceListener listener;
     private LeaderState leaderState;
@@ -86,7 +85,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         this.restUrl = restUrl;
         this.configStorage = configStorage;
         this.assignmentSnapshot = null;
-        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
+        new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.listener = listener;
         this.rejoinRequested = false;
     }
@@ -306,11 +305,9 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     private class WorkerCoordinatorMetrics {
-        public final Metrics metrics;
         public final String metricGrpName;
 
         public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
-            this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             Measurable numConnectors = new Measurable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 9567ef9..3faff65 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -22,8 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -74,11 +72,4 @@ public class ConnectorInfo {
         return Objects.hash(name, config, tasks);
     }
 
-
-    private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.connect.util.ConnectorTaskId> tasks) {
-        List<ConnectorTaskId> jsonTasks = new ArrayList<>();
-        for (ConnectorTaskId task : tasks)
-            jsonTasks.add(task);
-        return jsonTasks;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index a098535..58c966d 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -84,7 +84,6 @@ object AclCommand {
         CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
 
       for ((resource, acls) <- resourceToAcl) {
-        val acls = resourceToAcl(resource)
         println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
         authorizer.addAcls(acls, resource)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index aa38f69..d3ce217 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -320,7 +320,7 @@ object AdminUtils extends Logging with AdminUtilities {
         try {
           zkUtils.createPersistentPath(getDeleteTopicPath(topic))
         } catch {
-          case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+          case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
             "topic %s is already marked for deletion".format(topic))
           case e2: Throwable => throw new AdminOperationException(e2)
         }
@@ -471,7 +471,7 @@ object AdminUtils extends Logging with AdminUtilities {
       }
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
-      case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
+      case _: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
@@ -593,7 +593,7 @@ object AdminUtils extends Logging with AdminUtilities {
             case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
           }
 
-        case o => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
+        case _ => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
       }
     }
     props

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 20048ec..34df6b0 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -99,7 +99,7 @@ object ConfigCommand extends Config {
   private def parseBroker(broker: String): Int = {
     try broker.toInt
     catch {
-      case e: NumberFormatException =>
+      case _: NumberFormatException =>
         throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
     }
   }
@@ -190,20 +190,20 @@ object ConfigCommand extends Config {
           val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
                                    .map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
           child match {
-            case Some (s) =>
+            case Some(s) =>
                 rootEntities.flatMap(rootEntity =>
                   ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
             case None => rootEntities
           }
-        case (rootName, Some(childEntity)) =>
+        case (_, Some(childEntity)) =>
           childEntity.sanitizedName match {
-            case Some(subName) => Seq(this)
+            case Some(_) => Seq(this)
             case None =>
                 zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
                        .map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
 
           }
-        case (rootName, None) =>
+        case (_, None) =>
           Seq(this)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 354e6a2..6300d76 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -253,7 +253,7 @@ object ConsumerGroupCommand extends Logging {
       }
 
       assignmentRows ++= groupConsumerIds.sortBy(- consumerTopicPartitions.get(_).size).flatMap { consumerId =>
-        topicsByConsumerId(consumerId).flatMap { topic =>
+        topicsByConsumerId(consumerId).flatMap { _ =>
           // since consumers with no topic partitions are processed here, we pass empty for topic partitions and offsets
           // since consumer id is repeated in client id, leave host and client id empty
           collectConsumerAssignment(group, None, Array[TopicAndPartition](), Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index d194eca..81014b1 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -109,7 +109,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       zkUtils.createPersistentPath(zkPath, jsonData)
       info("Created preferred replica election path with %s".format(jsonData))
     } catch {
-      case nee: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         val partitionsUndergoingPreferredReplicaElection =
           PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
         throw new AdminOperationException("Preferred replica leader election currently in progress for " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index a037fd4..709b365 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -218,7 +218,7 @@ object ReassignPartitionsCommand extends Logging {
                                             partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
     val newReplicas = partitionsToBeReassigned(topicAndPartition)
     partitionsBeingReassigned.get(topicAndPartition) match {
-      case Some(partition) => ReassignmentInProgress
+      case Some(_) => ReassignmentInProgress
       case None =>
         // check if the current replica assignment matches the expected one after reassignment
         val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
@@ -394,7 +394,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
         true
       }
     } catch {
-      case ze: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
         throw new AdminCommandFailedException("Partition reassignment currently in " +
         "progress for %s. Aborting operation".format(partitionsBeingReassigned))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index a35a989..2fcc2ce 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -176,11 +176,11 @@ object TopicCommand extends Logging {
           println("Note: This will have no impact if delete.topic.enable is not set to true.")
         }
       } catch {
-        case e: ZkNodeExistsException =>
+        case _: ZkNodeExistsException =>
           println("Topic %s is already marked for deletion.".format(topic))
         case e: AdminOperationException =>
           throw e
-        case e: Throwable =>
+        case _: Throwable =>
           throw new AdminOperationException("Error while deleting topic %s".format(topic))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a87e5b7..9ffee86 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -123,7 +123,6 @@ object ZkSecurityMigrator extends Logging {
 }
 
 class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
-  private val workQueue = new LinkedBlockingQueue[Runnable]
   private val futures = new Queue[Future[String]]
 
   private def setAcl(path: String, setPromise: Promise[String]) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 02eeae1..1ba5cfa 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -30,7 +30,7 @@ object ControlledShutdownResponse {
     val numEntries = buffer.getInt
 
     var partitionsRemaining = Set[TopicAndPartition]()
-    for (i<- 0 until numEntries){
+    for (_ <- 0 until numEntries){
       val topic = readShortString(buffer)
       val partition = buffer.getInt
       partitionsRemaining += new TopicAndPartition(topic, partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 3c380c9..57e99c1 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -69,7 +69,7 @@ object FetchRequest {
     val groupedByTopic = requestInfo.groupBy { case (tp, _) => tp.topic }.map { case (topic, values) =>
       topic -> random.shuffle(values)
     }
-    random.shuffle(groupedByTopic.toSeq).flatMap { case (topic, partitions) =>
+    random.shuffle(groupedByTopic.toSeq).flatMap { case (_, partitions) =>
       partitions.map { case (tp, fetchInfo) => tp -> fetchInfo }
     }
   }
@@ -196,9 +196,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val fetchResponsePartitionData = requestInfo.map {
-      case (topicAndPartition, data) =>
-        (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
+    val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, _) =>
+      (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
     }
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index e5813a5..9123788 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -46,10 +46,10 @@ object PartitionStateInfo {
     val leader = buffer.getInt
     val leaderEpoch = buffer.getInt
     val isrSize = buffer.getInt
-    val isr = for(i <- 0 until isrSize) yield buffer.getInt
+    val isr = for (_ <- 0 until isrSize) yield buffer.getInt
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
-    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
+    val replicas = for (_ <- 0 until replicationFactor) yield buffer.getInt
     PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
                        replicas.toSet)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index d4f6158..94223c7 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -48,7 +48,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
 
   lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 
-  def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != Errors.NONE.code }
+  def hasError = commitStatus.values.exists(_ != Errors.NONE.code)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index b15cf5a..416dd73 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -114,8 +114,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val partitionOffsetResponseMap = requestInfo.map {
-      case (topicAndPartition, partitionOffsetRequest) =>
+    val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, _) =>
         (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil))
     }
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
@@ -133,4 +132,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
       offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index aad2fa5..3ca7bd7 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -133,9 +133,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
         requestChannel.closeConnection(request.processor, request)
     }
     else {
-      val producerResponseStatus = data.map {
-        case (topicAndPartition, data) =>
-          (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
+      val producerResponseStatus = data.map { case (topicAndPartition, _) =>
+        (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
       }
       val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index fda8b0b..8893697 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -121,7 +121,7 @@ object ClientUtils extends Logging{
            debug("Created channel to broker %s:%d.".format(channel.host, channel.port))
            true
          } catch {
-           case e: Exception =>
+           case _: Exception =>
              if (channel != null) channel.disconnect()
              channel = null
              info("Error while creating channel to %s:%d.".format(broker.host, broker.port))
@@ -164,7 +164,7 @@ object ClientUtils extends Logging{
            }
          }
          catch {
-           case ioe: IOException =>
+           case _: IOException =>
              info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port))
              queryChannel.disconnect()
          }
@@ -187,7 +187,7 @@ object ClientUtils extends Logging{
            queryChannel.disconnect()
          }
          catch {
-           case ioe: IOException => // offsets manager may have moved
+           case _: IOException => // offsets manager may have moved
              info("Error while connecting to %s.".format(connectString))
              if (offsetManagerChannel != null) offsetManagerChannel.disconnect()
              Thread.sleep(retryBackOffMs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
index 91823f0..847e959 100644
--- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -32,7 +32,7 @@ object BrokerEndPoint {
    */
   def parseHostPort(connectionString: String): Option[(String, Int)] = {
     connectionString match {
-      case uriParseExp(host, port) => try Some(host, port.toInt) catch { case e: NumberFormatException => None }
+      case uriParseExp(host, port) => try Some(host, port.toInt) catch { case _: NumberFormatException => None }
       case _ => None
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 28d3c8d..4d3fb56 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -415,11 +415,10 @@ class Partition(val topic: String,
      * is violated, that replica is considered to be out of sync
      *
      **/
-    val leaderLogEndOffset = leaderReplica.logEndOffset
     val candidateReplicas = inSyncReplicas - leaderReplica
 
     val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
-    if(laggingReplicas.nonEmpty)
+    if (laggingReplicas.nonEmpty)
       debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
 
     laggingReplicas

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index dfb203a..13c1921 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -38,12 +38,7 @@ class Replica(val brokerId: Int,
   val topic = partition.topic
   val partitionId = partition.partitionId
 
-  def isLocal: Boolean = {
-    log match {
-      case Some(l) => true
-      case None => false
-    }
-  }
+  def isLocal: Boolean = log.isDefined
 
   private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 580ae33..ef8190c 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -91,7 +91,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
           val changeId = changeNumber(notification)
           if (changeId > lastExecutedChange) {
             val changeZnode = seqNodeRoot + "/" + notification
-            val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
+            val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
             data.map(notificationHandler.processNotification(_)).getOrElse {
               logger.warn(s"read null data from $changeZnode when processing notification $notification")
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 96fe690..900a4b6 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -72,7 +72,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
 
-    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+    val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
     val partitionAssignment =
       new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
 
@@ -131,7 +131,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
 class RangeAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
-    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+    val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
     val partitionAssignment =
       new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
     for (topic <- ctx.myTopicThreadIds.keySet) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 5706d3c..eb035f2 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -140,8 +140,8 @@ private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
 
   def pattern: String = {
     topicFilter match {
-      case wl: Whitelist => TopicCount.whiteListPattern
-      case bl: Blacklist => TopicCount.blackListPattern
+      case _: Whitelist => TopicCount.whiteListPattern
+      case _: Blacklist => TopicCount.blackListPattern
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 1ab4e5c..914e9b9 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -34,7 +34,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
     Pattern.compile(regex)
   }
   catch {
-    case e: PatternSyntaxException =>
+    case _: PatternSyntaxException =>
       throw new RuntimeException(regex + " is an invalid regex.")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f776578..81b6264 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -322,8 +322,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def commitOffsets(isAutoCommit: Boolean) {
 
     val offsetsToCommit =
-      immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
-        partitionTopicInfos.map { case (partition, info) =>
+      immutable.Map(topicRegistry.values.flatMap { partitionTopicInfos =>
+        partitionTopicInfos.values.map { info =>
           TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
         }
       }.toSeq: _*)
@@ -442,7 +442,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             trace("Offset fetch response: %s.".format(offsetFetchResponse))
 
             val (leaderChanged, loadInProgress) =
-              offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
+              offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
                 (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
                  folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
               }
@@ -706,7 +706,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
         val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
         val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
-          valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
+          valueFactory = Some((_: String) => new Pool[Int, PartitionTopicInfo]))
 
         // fetch current offsets for all topic-partitions
         val topicPartitions = partitionAssignment.keySet.toSeq
@@ -731,7 +731,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           if(reflectPartitionOwnershipDecision(partitionAssignment)) {
             allTopicsOwnedPartitionsCount = partitionAssignment.size
 
-            partitionAssignment.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
+            partitionAssignment.view.groupBy { case (topicPartition, _) => topicPartition.topic }
                                       .foreach { case (topic, partitionThreadPairs) =>
               newGauge("OwnedPartitionsCount",
                 new Gauge[Int] {
@@ -851,11 +851,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           successfullyOwnedPartitions ::= (topic, partition)
           true
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             // The node hasn't been deleted by the original owner. So wait a bit and retry.
             info("waiting for the partition ownership to be deleted: " + partition)
             false
-          case e2: Throwable => throw e2
         }
       }
       val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
@@ -918,19 +917,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       topicCount.getConsumerThreadIdsPerTopic
 
     val allQueuesAndStreams = topicCount match {
-      case wildTopicCount: WildcardTopicCount =>
+      case _: WildcardTopicCount =>
         /*
          * Wild-card consumption streams share the same queues, so we need to
          * duplicate the list for the subsequent zip operation.
          */
         (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
-      case statTopicCount: StaticTopicCount =>
+      case _: StaticTopicCount =>
         queuesAndStreams
     }
 
-    val topicThreadIds = consumerThreadIdsPerTopic.map {
-      case(topic, threadIds) =>
-        threadIds.map((topic, _))
+    val topicThreadIds = consumerThreadIdsPerTopic.map { case (topic, threadIds) =>
+      threadIds.map((topic, _))
     }.flatten
 
     require(topicThreadIds.size == allQueuesAndStreams.size,
@@ -988,7 +986,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         "message streams by filter at most once.")
 
     private val wildcardQueuesAndStreams = (1 to numStreams)
-      .map(e => {
+      .map(_ => {
         val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](queue,
                                           config.consumerTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 063ea6f..b7b4d71 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -77,25 +77,23 @@ class ControllerContext(val zkUtils: ZkUtils,
   def liveOrShuttingDownBrokers = liveBrokersUnderlying
 
   def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
-    partitionReplicaAssignment
-      .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
-      .map { case(topicAndPartition, replicas) => topicAndPartition }
-      .toSet
+    partitionReplicaAssignment.collect {
+      case (topicAndPartition, replicas) if replicas.contains(brokerId) => topicAndPartition
+    }.toSet
   }
 
   def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
     brokerIds.flatMap { brokerId =>
-      partitionReplicaAssignment
-        .filter { case (topicAndPartition, replicas) => replicas.contains(brokerId) }
-        .map { case (topicAndPartition, replicas) =>
+      partitionReplicaAssignment.collect {
+        case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
           new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
-        }
+      }
     }.toSet
   }
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
     partitionReplicaAssignment
-      .filter { case (topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
+      .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
       .flatMap { case (topicAndPartition, replicas) =>
         replicas.map { r =>
           new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
@@ -103,10 +101,8 @@ class ControllerContext(val zkUtils: ZkUtils,
       }.toSet
   }
 
-  def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
-    partitionReplicaAssignment
-      .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
-  }
+  def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] =
+    partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic)
 
   def allLiveReplicas(): Set[PartitionAndReplica] = {
     replicasOnBrokers(liveBrokerIds)
@@ -144,7 +140,7 @@ object KafkaController extends Logging {
         case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
       }
     } catch {
-      case t: Throwable =>
+      case _: Throwable =>
         // It may be due to an incompatible controller register version
         warn("Failed to parse the controller info as json. "
           + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
@@ -433,7 +429,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
-      case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+      case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
     }
     partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
     // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
@@ -622,12 +618,11 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
     val newReplicas = reassignedPartitionContext.newReplicas
     val topic = topicAndPartition.topic
     val partition = topicAndPartition.partition
-    val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
     try {
       val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
       assignedReplicasOpt match {
         case Some(assignedReplicas) =>
-          if(assignedReplicas == newReplicas) {
+          if (assignedReplicas == newReplicas) {
             throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
               " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
           } else {
@@ -706,7 +701,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
         controllerContext.epoch = newControllerEpoch
       }
     } catch {
-      case nne: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         // if path doesn't exist, this is the first controller whose epoch should be 1
         // the following call can still fail if another controller gets elected between checking if the path exists and
         // trying to create the controller epoch path
@@ -715,7 +710,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
           controllerContext.epoch = KafkaController.InitialControllerEpoch
           controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
         } catch {
-          case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
+          case _: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
             "Aborting controller startup procedure")
           case oe: Throwable => error("Error while incrementing controller epoch", oe)
         }
@@ -788,7 +783,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
 
   private def initializeTopicDeletion() {
     val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
-    val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
+    val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case (_, replicas) =>
       replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
     val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
@@ -989,7 +984,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
       zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
-      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
+      case _: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
       case e2: Throwable => throw new KafkaException(e2.toString)
     }
   }
@@ -1182,7 +1177,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
       inLock(controllerContext.controllerLock) {
         preferredReplicasForTopicsByBrokers =
           controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
-            case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+            case (_, assignedReplicas) => assignedReplicas.head
           }
       }
       debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
@@ -1192,13 +1187,10 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
           var imbalanceRatio: Double = 0
           var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
           inLock(controllerContext.controllerLock) {
-            topicsNotInPreferredReplica =
-              topicAndPartitionsForBroker.filter {
-                case(topicPartition, replicas) => {
-                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
-                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
-                }
-              }
+            topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
+              controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
+                controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+            }
             debug("topics not in preferred replica " + topicsNotInPreferredReplica)
             val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
             val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
@@ -1208,18 +1200,16 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
           // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
           // that need to be on this broker
           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            topicsNotInPreferredReplica.foreach {
-              case(topicPartition, replicas) => {
-                inLock(controllerContext.controllerLock) {
-                  // do this check only if the broker is live and there are no partitions being reassigned currently
-                  // and preferred replica election is not in progress
-                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                      controllerContext.partitionsBeingReassigned.isEmpty &&
-                      controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
-                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
-                      controllerContext.allTopics.contains(topicPartition.topic)) {
-                    onPreferredReplicaElection(Set(topicPartition), true)
-                  }
+            topicsNotInPreferredReplica.keys.foreach { topicPartition =>
+              inLock(controllerContext.controllerLock) {
+                // do this check only if the broker is live and there are no partitions being reassigned currently
+                // and preferred replica election is not in progress
+                if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                    controllerContext.partitionsBeingReassigned.isEmpty &&
+                    controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
+                    !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+                    controllerContext.allTopics.contains(topicPartition.topic)) {
+                  onPreferredReplicaElection(Set(topicPartition), true)
                 }
               }
             }
@@ -1373,7 +1363,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
 
   private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
     val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
-    val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
+    val (jsonOpt, _) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
     if (jsonOpt.isDefined) {
       val json = Json.parseFull(jsonOpt.get)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 32bf4da..ee94b46 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -238,7 +238,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * zookeeper
    */
   private def initializePartitionState() {
-    for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
+    for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
       // check if leader and isr path exists for partition. If not, then it is in NEW state
       controllerContext.partitionLeadershipInfo.get(topicPartition) match {
         case Some(currentLeaderIsrAndEpoch) =>
@@ -297,7 +297,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
             topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             // read the controller epoch
             val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
               topicAndPartition.partition).get
@@ -357,7 +357,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, replicas)
     } catch {
-      case lenne: LeaderElectionNotNeededException => // swallow
+      case _: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
       case sce: Throwable =>
         val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
@@ -430,8 +430,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
               deletedTopics, addedPartitionReplicaAssignment))
-            if(newTopics.nonEmpty)
-              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
+            if (newTopics.nonEmpty)
+              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
           } catch {
             case e: Throwable => error("Error while handling new topic", e )
           }
@@ -522,7 +522,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             if (partitionsToBeAdded.nonEmpty) {
               info("New partitions to be added %s".format(partitionsToBeAdded))
               controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
-              controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
+              controller.onNewPartitionCreation(partitionsToBeAdded.keySet)
             }
           }
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index d4e9bb4..03887ae 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -246,7 +246,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           // As an optimization, the controller removes dead replicas from the ISR
           val leaderAndIsrIsEmpty: Boolean =
             controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
-              case Some(currLeaderIsrAndControllerEpoch) =>
+              case Some(_) =>
                 controller.removeReplicaFromIsr(topic, partition, replicaId) match {
                   case Some(updatedLeaderIsrAndControllerEpoch) =>
                     // send the shrunk ISR state change request to all the remaining alive replicas of the partition.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 98057dd..8e5f3a1 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -342,8 +342,8 @@ class TopicDeletionManager(controller: KafkaController,
    *@param replicasForTopicsToBeDeleted
    */
   private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
-    replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
-      var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
+    replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
+      val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
       val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
       val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
       val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 79d4411..e0c8e65 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -265,7 +265,7 @@ class GroupMetadataManager(val brokerId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
     // first filter out partitions with offset metadata size exceeding limit
-    val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
+    val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f29cde7..6b57696 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -290,7 +290,7 @@ class Log(val dir: File,
         try {
           curr.recover(config.maxMessageSize)
         } catch {
-          case e: InvalidOffsetException =>
+          case _: InvalidOffsetException =>
             val startOffset = curr.baseOffset
             warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                  "creating an empty one with starting offset " + startOffset)
@@ -627,7 +627,7 @@ class Log(val dir: File,
       val fetchDataInfo = read(offset, 1)
       fetchDataInfo.fetchOffsetMetadata
     } catch {
-      case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+      case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bb8a89a..34b0dbf 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -240,7 +240,7 @@ class LogCleaner(val config: CleanerConfig,
             recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
             endOffset = nextDirtyOffset
           } catch {
-            case pe: LogCleaningAbortedException => // task can be aborted, let it go.
+            case _: LogCleaningAbortedException => // task can be aborted, let it go.
           } finally {
             cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b3e6e72..b808348 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -112,12 +112,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     */
   def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
     inLock(lock) {
-      val toClean = logs.filterNot {
-        case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
-      }.filter {
-        case (topicAndPartition, log) => isCompactAndDelete(log)
+      val toClean = logs.filter { case (topicAndPartition, log) =>
+        !inProgress.contains(topicAndPartition) && isCompactAndDelete(log)
       }
-      toClean.foreach{x => inProgress.put(x._1, LogCleaningInProgress)}
+      toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
       toClean
     }
 
@@ -317,4 +315,4 @@ private[log] object LogCleanerManager extends Logging {
 
     (firstDirtyOffset, firstUncleanableDirtyOffset)
   }
-}
\ No newline at end of file
+}


[3/4] kafka git commit: MINOR: A bunch of clean-ups related to usage of unused variables

Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 1ef91b9..850b0e0 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -110,7 +110,7 @@ object ByteBufferMessageSet {
           while (true)
             innerMessageAndOffsets.add(readMessageFromStream(compressed))
         } catch {
-          case eofe: EOFException =>
+          case _: EOFException =>
             // we don't do anything at all here, because the finally
             // will close the compressed input stream, and we simply
             // want to return the innerMessageAndOffsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 7daab67..13b57e3 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -158,23 +158,16 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
   )
 
   private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
-    val filteredTags = tags
-      .filter { case (tagKey, tagValue) => tagValue != ""}
+    val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
     if (filteredTags.nonEmpty) {
-      val tagsString = filteredTags
-        .map { case (key, value) => "%s=%s".format(key, value)}
-        .mkString(",")
-
+      val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
       Some(tagsString)
     }
-    else {
-      None
-    }
+    else None
   }
 
   private def toScope(tags: collection.Map[String, String]): Option[String] = {
-    val filteredTags = tags
-      .filter { case (tagKey, tagValue) => tagValue != ""}
+    val filteredTags = tags.filter { case (_, tagValue) => tagValue != ""}
     if (filteredTags.nonEmpty) {
       // convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
       val tagsString = filteredTags
@@ -184,9 +177,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
 
       Some(tagsString)
     }
-    else {
-      None
-    }
+    else None
   }
 
   def removeAllConsumerMetrics(clientId: String) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 5408e0d..0f10577 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -82,7 +82,7 @@ class BlockingChannel( val host: String,
                          connectTimeoutMs))
 
       } catch {
-        case e: Throwable => disconnect()
+        case _: Throwable => disconnect()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 6b10e51..f793811 100755
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -24,8 +24,6 @@ import org.apache.kafka.common.utils.Utils
 @deprecated("This class has been deprecated and will be removed in a future release. " +
             "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
 class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
-  private val random = new java.util.Random
-  
   def partition(key: Any, numPartitions: Int): Int = {
     Utils.abs(key.hashCode) % numPartitions
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index c2f95ea..2d2bfdb 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -104,7 +104,7 @@ class Producer[K,V](val config: ProducerConfig,
             }
           }
           catch {
-            case e: InterruptedException =>
+            case _: InterruptedException =>
               false
           }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 797c77b..17d09ce 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -25,7 +25,7 @@ object Resource {
   def fromString(str: String): Resource = {
     str.split(Separator, 2) match {
       case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name)
-      case s => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
+      case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 72f79d5..5cfdcd6 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -316,13 +316,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     try {
       zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         try {
           debug(s"Node $path does not exist, attempting to create it.")
           zkUtils.createPersistentPath(path, data)
           (true, 0)
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             debug(s"Failed to create node for $path because it already exists.")
             (false, 0)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index d87a8cf..5e584ab 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -97,7 +97,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
 
   def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
     mapLock synchronized {
-      for ((key, fetcher) <- fetcherThreadMap)
+      for (fetcher <- fetcherThreadMap.values)
         fetcher.removePartitions(partitions)
     }
     info("Removed fetcher for partitions %s".format(partitions.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 8cb2270..325c7af 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -127,7 +127,7 @@ class AdminManager(val config: KafkaConfig,
           AdminUtils.deleteTopic(zkUtils, topic)
           DeleteTopicMetadata(topic, Errors.NONE)
         } catch {
-          case e: TopicAlreadyMarkedForDeletionException =>
+          case _: TopicAlreadyMarkedForDeletionException =>
             // swallow the exception, and still track deletion allowing multiple calls to wait for deletion
             DeleteTopicMetadata(topic, Errors.NONE)
           case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 00e5d0c..cc2c4cd 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -65,7 +65,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
             throw new IOException("Unrecognized version of the server meta.properties file: " + version)
         }
       } catch {
-        case e: FileNotFoundException =>
+        case _: FileNotFoundException =>
           warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
           None
         case e1: Exception =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c4472c6..0c7c26b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -183,7 +183,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       // trigger the callback immediately if quota is not violated
       callback(0)
     } catch {
-      case qve: QuotaViolationException =>
+      case _: QuotaViolationException =>
         // Compute the delay
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
         throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
@@ -412,9 +412,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
           logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
           overriddenQuota.put(quotaId, newQuota)
           (sanitizedUser, clientId) match {
-            case (Some(u), Some(c)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-            case (Some(u), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-            case (None, Some(c)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+            case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+            case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+            case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
             case (None, None) =>
           }
         case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 2feeae8..4bf04e6 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -112,10 +112,10 @@ class DelayedFetch(delayMs: Long,
             }
           }
         } catch {
-          case utpe: UnknownTopicOrPartitionException => // Case B
+          case _: UnknownTopicOrPartitionException => // Case B
             debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
             return forceComplete()
-          case nle: NotLeaderForPartitionException =>  // Case A
+          case _: NotLeaderForPartitionException =>  // Case A
             debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
             return forceComplete()
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index b31d838..2e9e714 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -88,7 +88,6 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
                            private val configHandlers: Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
                            private val time: Time = SystemTime) extends Logging {
-  private var lastExecutedChange = -1L
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(json: String) = {
@@ -106,7 +105,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
                 "Supported versions are 1 and 2.")
           }
 
-        case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
+        case _ => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
           "{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " +
           "{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
           " Received: " + json)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1b723f..c6c8dbd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -755,7 +755,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
         java.util.Collections.emptyList())
     } catch {
-      case e: TopicExistsException => // let it go, possibly another broker created this topic
+      case _: TopicExistsException => // let it go, possibly another broker created this topic
         new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
           java.util.Collections.emptyList())
       case ex: Throwable  => // Catch all to prevent unhandled errors

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 117899b..0ae9124 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -43,7 +43,6 @@ class KafkaHealthcheck(brokerId: Int,
                        rack: Option[String],
                        interBrokerProtocolVersion: ApiVersion) extends Logging {
 
-  private val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   private[server] val sessionExpireListener = new SessionExpireListener
 
   def startup() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index df46336..a39fe49 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -111,7 +111,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
             throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
         }
       } catch {
-        case e: NumberFormatException => throw malformedLineException(line)
+        case _: NumberFormatException => throw malformedLineException(line)
       } finally {
         reader.close()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 32bc660..b43695a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -223,20 +223,20 @@ class ReplicaManager(val config: KafkaConfig,
       deletePartition.toString, topic, partitionId))
     val errorCode = Errors.NONE.code
     getPartition(topic, partitionId) match {
-      case Some(partition) =>
-        if(deletePartition) {
+      case Some(_) =>
+        if (deletePartition) {
           val removedPartition = allPartitions.remove((topic, partitionId))
           if (removedPartition != null) {
             removedPartition.delete() // this will delete the local log
             val topicHasPartitions = allPartitions.keys.exists { case (t, _) => topic == t }
             if (!topicHasPartitions)
-                BrokerTopicStats.removeMetrics(topic)
+              BrokerTopicStats.removeMetrics(topic)
           }
         }
       case None =>
         // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
         // This could happen when topic is being deleted while broker is down and recovers.
-        if(deletePartition) {
+        if (deletePartition) {
           val topicAndPartition = TopicAndPartition(topic, partitionId)
 
           if(logManager.getLog(topicAndPartition).isDefined) {
@@ -358,10 +358,9 @@ class ReplicaManager(val config: KafkaConfig,
     } else {
       // If required.acks is outside accepted range, something is wrong with the client
       // Just return an error and don't handle the request at all
-      val responseStatus = messagesPerPartition.map {
-        case (topicAndPartition, messageSet) =>
-          topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
-            LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
+      val responseStatus = messagesPerPartition.map { case (topicAndPartition, _) =>
+        topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+          LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
       }
       responseCallback(responseStatus)
     }
@@ -657,11 +656,9 @@ class ReplicaManager(val config: KafkaConfig,
     replicaStateChangeLock synchronized {
       val responseMap = new mutable.HashMap[TopicPartition, Short]
       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
         stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
           correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
-        }
         BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
@@ -694,7 +691,7 @@ class ReplicaManager(val config: KafkaConfig,
           }
         }
 
-        val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
+        val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
           stateInfo.leader == config.brokerId
         }
         val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
@@ -830,7 +827,7 @@ class ReplicaManager(val config: KafkaConfig,
         val newLeaderBrokerId = partitionStateInfo.leader
         metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
           // Only change partition state when the leader is available
-          case Some(leaderBroker) =>
+          case Some(_) =>
             if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
               partitionsToMakeFollower += partition
             else

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 5c487bf..bb6caa0 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -83,7 +83,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
       leaderId = brokerId
       onBecomingLeader()
     } catch {
-      case e: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         // If someone else has written the path, then
         leaderId = getControllerID 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 56ae0c9..eea66f8 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -119,11 +119,11 @@ object ConsoleConsumer extends Logging {
       val msg: BaseConsumerRecord = try {
         consumer.receive()
       } catch {
-        case nse: StreamEndException =>
+        case _: StreamEndException =>
           trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
           // Consumer is already closed
           return
-        case nse: WakeupException =>
+        case _: WakeupException =>
           trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
           // Consumer will be closed
           return
@@ -358,7 +358,7 @@ object ConsoleConsumer extends Logging {
             val offset =
               try offsetString.toLong
               catch {
-                case e: NumberFormatException => invalidOffset(offsetString)
+                case _: NumberFormatException => invalidOffset(offsetString)
               }
             if (offset < 0) invalidOffset(offsetString)
             offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 63a04c9..2b3f56d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -71,7 +71,7 @@ object ConsumerPerformance {
       val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
       val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
       var threadList = List[ConsumerPerfThread]()
-      for ((topic, streamList) <- topicMessageStreams)
+      for (streamList <- topicMessageStreams.values)
         for (i <- 0 until streamList.length)
           threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f4f7acf..c299676 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -414,12 +414,10 @@ object DumpLogSegments {
         }
       }
 
-      shallowOffsetNotFound.foreach {
-        case (fileName, listOfShallowOffsetNotFound) => {
-          System.err.println("The following indexed offsets are not found in the log.")
-          listOfShallowOffsetNotFound.foreach(m => {
-            System.err.println("Indexed offset: %s, found log offset: %s".format(m._1, m._2))
-          })
+      shallowOffsetNotFound.values.foreach { listOfShallowOffsetNotFound =>
+        System.err.println("The following indexed offsets are not found in the log.")
+        listOfShallowOffsetNotFound.foreach { case (indexedOffset, logOffset) =>
+          System.err.println(s"Indexed offset: $indexedOffset, found log offset: $logOffset")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 1c92088..9aaad3e 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -122,8 +122,7 @@ object EndToEndLatency {
 
       //Check we only got the one message
       if (recordIter.hasNext) {
-        var count = 1
-        for (elem <- recordIter) count += 1
+        val count = 1 + recordIter.size
         throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 17b8f0b..1a6ba69 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -375,7 +375,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           mirrorMakerConsumer.commit()
           throw e
 
-        case e: CommitFailedException =>
+        case _: CommitFailedException =>
           warn("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to " +
             "another instance. If you see this regularly, it could indicate that you need to either increase " +
             s"the consumer's ${consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number of records " +
@@ -435,9 +435,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
               maybeFlushAndCommitOffsets()
             }
           } catch {
-            case cte: ConsumerTimeoutException =>
+            case _: ConsumerTimeoutException =>
               trace("Caught ConsumerTimeoutException, continue iteration.")
-            case we: WakeupException =>
+            case _: WakeupException =>
               trace("Caught ConsumerWakeupException, continue iteration.")
           }
           maybeFlushAndCommitOffsets()
@@ -485,7 +485,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         mirrorMakerConsumer.stop()
       }
       catch {
-        case ie: InterruptedException =>
+        case _: InterruptedException =>
           warn("Interrupt during shutdown of the mirror maker thread")
       }
     }
@@ -495,7 +495,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         shutdownLatch.await()
         info("Mirror maker thread shutdown complete")
       } catch {
-        case ie: InterruptedException =>
+        case _: InterruptedException =>
           warn("Shutdown of the mirror maker thread interrupted")
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index d88ec41..4e2c7ef 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -32,9 +32,6 @@ object ReplayLogProducer extends Logging {
   def main(args: Array[String]) {
     val config = new Config(args)
 
-    val executor = Executors.newFixedThreadPool(config.numThreads)
-    val allDone = new CountDownLatch(config.numThreads)
-
     // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
     ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
     Thread.sleep(500)
@@ -51,7 +48,7 @@ object ReplayLogProducer extends Logging {
     val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
     val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
     var threadList = List[ZKConsumerThread]()
-    for ((topic, streamList) <- topicMessageStreams)
+    for (streamList <- topicMessageStreams.values)
       for (stream <- streamList)
         threadList ::= new ZKConsumerThread(config, stream)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 9a059df..01d3aa8 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -107,7 +107,7 @@ object ReplicaVerificationTool extends Logging {
       Pattern.compile(regex)
     }
     catch {
-      case e: PatternSyntaxException =>
+      case _: PatternSyntaxException =>
         throw new RuntimeException(regex + " is an invalid regex.")
     }
 
@@ -151,14 +151,13 @@ object ReplicaVerificationTool extends Logging {
           topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId))
           .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
     debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
-    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap(
-      topicMetadataResponse =>
-        topicMetadataResponse.partitionsMetadata.map(
-          partitionMetadata =>
-            (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))
-    ).groupBy(_._2)
-     .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map {
-        case(topicAndPartition, leaderId) => topicAndPartition })
+    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
+      topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
+        (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
+      }
+    }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
+       topicAndPartition
+     })
     debug("Leaders per broker: " + leadersPerBroker)
 
     val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
@@ -236,8 +235,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
   }
 
   private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
-    offsetResponse.partitionErrorAndOffsets.filter {
-      case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != Errors.NONE.code
+    offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) =>
+      partitionOffsetsResponse.error != Errors.NONE.code
     }.mkString
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 1c059bb..99b5aae 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -271,8 +271,8 @@ object CoreUtils extends Logging {
    */
   def duplicates[T](s: Traversable[T]): Iterable[T] = {
     s.groupBy(identity)
-      .map{ case (k,l) => (k,l.size)}
-      .filter{ case (k,l) => l > 1 }
+      .map { case (k, l) => (k, l.size)}
+      .filter { case (_, l) => l > 1 }
       .keys
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index b43b4b1..896c300 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -52,7 +52,7 @@ class FileLock(val file: File) extends Logging {
         flock = channel.tryLock()
         flock != null
       } catch {
-        case e: OverlappingFileLockException => false
+        case _: OverlappingFileLockException => false
       }
     }
   }
@@ -77,4 +77,4 @@ class FileLock(val file: File) extends Logging {
       channel.close()
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index aa120ab..5d2549e 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -61,12 +61,10 @@ object Mx4jLoader extends Logging {
       true
     }
     catch {
-	  case e: ClassNotFoundException => {
+	  case _: ClassNotFoundException =>
         info("Will not load MX4J, mx4j-tools.jar is not in the classpath")
-      }
-      case e: Throwable => {
+      case e: Throwable =>
         warn("Could not start register mbean in JMX", e)
-      }
     }
     false
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 31e8a92..369bb23 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -58,13 +58,13 @@ object ReplicationUtils extends Logging {
           (expectedLeader,writtenLeader) match {
             case (Some(expectedLeader),Some(writtenLeader)) =>
               if(expectedLeader == writtenLeader)
-                return (true,writtenStat.getVersion())
+                return (true, writtenStat.getVersion())
             case _ =>
           }
         case None =>
       }
     } catch {
-      case e1: Exception =>
+      case _: Exception =>
     }
     (false,-1)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index f57245f..9600b0a 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -181,7 +181,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
   /**
    * Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
    */
-  def getMap(name: String, valid: String => Boolean = s => true): Map[String, String] = {
+  def getMap(name: String, valid: String => Boolean = _ => true): Map[String, String] = {
     try {
       val m = CoreUtils.parseCsvMap(getString(name, ""))
       m.foreach {
@@ -208,7 +208,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
       CompressionCodec.getCompressionCodec(prop.toInt)
     }
     catch {
-      case nfe: NumberFormatException =>
+      case _: NumberFormatException =>
         CompressionCodec.getCompressionCodec(prop)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 80a9f1a..787cb8f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -237,7 +237,7 @@ class ZkUtils(val zkClient: ZkClient,
       createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId))
       proposedClusterId
     } catch {
-      case e: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper."))
     }
   }
@@ -389,7 +389,7 @@ class ZkUtils(val zkClient: ZkClient,
                                                       isSecure)
       zkCheckedEphemeral.create()
     } catch {
-      case e: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
                 + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
                 + "else you have shutdown this broker and restarted it faster than the zookeeper "
@@ -445,7 +445,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       ZkPath.createEphemeral(zkClient, path, data, acls)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createEphemeral(zkClient, path, data, acls)
     }
@@ -465,7 +465,7 @@ class ZkUtils(val zkClient: ZkClient,
         try {
           storedData = readData(path)._1
         } catch {
-          case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
+          case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
         }
         if (storedData == null || storedData != data) {
           info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -484,7 +484,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       ZkPath.createPersistent(zkClient, path, data, acls)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createPersistent(zkClient, path, data, acls)
     }
@@ -503,12 +503,12 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.writeData(path, data)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         try {
           ZkPath.createPersistent(zkClient, path, data, acls)
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             zkClient.writeData(path, data)
         }
     }
@@ -573,7 +573,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.writeData(path, data)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         createParentPath(path)
         ZkPath.createEphemeral(zkClient, path, data, acls)
     }
@@ -583,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.delete(path)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
         false
@@ -599,7 +599,7 @@ class ZkUtils(val zkClient: ZkClient,
       zkClient.delete(path, expectedVersion)
       true
     } catch {
-      case e: ZkBadVersionException => false
+      case _: ZkBadVersionException => false
     }
   }
 
@@ -607,7 +607,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.deleteRecursive(path)
     } catch {
-      case e: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
     }
@@ -624,7 +624,7 @@ class ZkUtils(val zkClient: ZkClient,
     val dataAndStat = try {
                         (Some(zkClient.readData(path, stat)), stat)
                       } catch {
-                        case e: ZkNoNodeException =>
+                        case _: ZkNoNodeException =>
                           (None, stat)
                       }
     dataAndStat
@@ -642,7 +642,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       zkClient.getChildren(path)
     } catch {
-      case e: ZkNoNodeException => Nil
+      case _: ZkNoNodeException => Nil
     }
   }
 
@@ -754,7 +754,7 @@ class ZkUtils(val zkClient: ZkClient,
           updatePersistentPath(zkPath, jsonData)
           debug("Updated partition reassignment path with %s".format(jsonData))
         } catch {
-          case nne: ZkNoNodeException =>
+          case _: ZkNoNodeException =>
             createPersistentPath(zkPath, jsonData)
             debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
           case e2: Throwable => throw new AdminOperationException(e2.toString)
@@ -835,7 +835,7 @@ class ZkUtils(val zkClient: ZkClient,
     try {
       writeToZk
     } catch {
-      case e1: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         makeSurePersistentPathExists(path)
         writeToZk
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index ce91a30..f13f59f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -111,7 +111,6 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
   @Test
   def testDescribeConsumerGroupForNonExistentGroup() {
     val nonExistentGroup = "non" + groupId
-    val sum = client.describeConsumerGroup(nonExistentGroup).consumers
     assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 55e8e4f..8502ae0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -149,11 +149,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
 
-    for (i <- 0 until producerCount)
+    for (_ <- 0 until producerCount)
       producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
         maxBlockMs = 3000,
         acks = 1)
-    for (i <- 0 until consumerCount)
+    for (_ <- 0 until consumerCount)
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
     // create the consumer offset topic
@@ -339,7 +339,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       sendRecords(numRecords, tp)
       fail("should have thrown exception")
     } catch {
-      case e: TimeoutException => //expected
+      case _: TimeoutException => //expected
     }
   }
 
@@ -517,7 +517,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumeRecords(consumer)
       Assert.fail("Expected TopicAuthorizationException")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case _: TopicAuthorizationException => //expected
     }
   }
 
@@ -595,7 +595,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumeRecords(consumer)
       Assert.fail("Expected TopicAuthorizationException")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case _: TopicAuthorizationException => //expected
     } finally consumer.close()
   }
 
@@ -813,11 +813,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
   }
 
-  private def removeAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
-    servers.head.apis.authorizer.get.removeAcls(acls, resource)
-    TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) -- acls, servers.head.apis.authorizer.get, resource)
-  }
-
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
                              numRecords: Int = 1,
                              startingOffset: Int = 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 102b7cf..732b99f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -280,7 +280,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
       try {
         consumer.poll(50)
       } catch {
-        case e: WakeupException => // ignore for shutdown
+        case _: WakeupException => // ignore for shutdown
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 816f36a..b5aaaf4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -142,12 +142,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         producer.send(record4, callback)
         fail("Should not allow sending a record without topic")
       } catch {
-        case iae: IllegalArgumentException => // this is ok
-        case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+        case _: IllegalArgumentException => // this is ok
       }
 
       // non-blocking send a list of records
-      for (i <- 1 to numRecords)
+      for (_ <- 1 to numRecords)
         producer.send(record0, callback)
 
       // check that all messages have been acked via offset
@@ -234,7 +233,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
       // non-blocking send a list of records
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
-      for (i <- 1 to numRecords)
+      for (_ <- 1 to numRecords)
         producer.send(record0)
       val response0 = producer.send(record0)
 
@@ -328,8 +327,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes))
       fail("Should not allow sending a record to a partition not present in the metadata")
     } catch {
-      case ke: KafkaException => // this is ok
-      case e: Throwable => fail("Only expecting KafkaException", e)
+      case _: KafkaException => // this is ok
     }
 
     AdminUtils.addPartitions(zkUtils, topic, 2)
@@ -370,8 +368,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     try {
       TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
-      for (i <- 0 until 50) {
-        val responses = (0 until numRecords) map (i => producer.send(record))
+      for (_ <- 0 until 50) {
+        val responses = (0 until numRecords) map (_ => producer.send(record))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         producer.flush()
         assertTrue("All requests are complete.", responses.forall(_.isDone()))
@@ -389,15 +387,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     // create topic
     val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
     val leader0 = leaders(0)
-    val leader1 = leaders(1)
 
     // create record
     val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
 
     // Test closing from caller thread.
-    for (i <- 0 until 50) {
+    for (_ <- 0 until 50) {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
-      val responses = (0 until numRecords) map (i => producer.send(record0))
+      val responses = (0 until numRecords) map (_ => producer.send(record0))
       assertTrue("No request is complete.", responses.forall(!_.isDone()))
       producer.close(0, TimeUnit.MILLISECONDS)
       responses.foreach { future =>
@@ -436,7 +433,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         // Trigger another batch in accumulator before close the producer. These messages should
         // not be sent.
         if (sendRecords)
-          (0 until numRecords) foreach (i => producer.send(record))
+          (0 until numRecords) foreach (_ => producer.send(record))
         // The close call will be called by all the message callbacks. This tests idempotence of the close call.
         producer.close(0, TimeUnit.MILLISECONDS)
         // Test close with non zero timeout. Should not block at all.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 4e6c740..479e749 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -276,7 +276,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
 
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
-    servers.foreach { s =>
+    servers.foreach { _ =>
       TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
index b26b242..d15a01d 100644
--- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
 object FixedPortTestUtils {
   def choosePorts(count: Int): Seq[Int] = {
     try {
-      val sockets = (0 until count).map(i => new ServerSocket(0))
+      val sockets = (0 until count).map(_ => new ServerSocket(0))
       val ports = sockets.map(_.getLocalPort())
       sockets.foreach(_.close())
       ports

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index ca020a6..83280dc 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -65,9 +65,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.putAll(consumerSecurityProps)
-    for (i <- 0 until producerCount)
+    for (_ <- 0 until producerCount)
       producers += createNewProducer
-    for (i <- 0 until consumerCount) {
+    for (_ <- 0 until consumerCount) {
       consumers += createNewConsumer
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index d18dc3a..aefe5bd 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -782,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // create a group of consumers, subscribe the consumers to all the topics and start polling
     // for the topic partition assignment
-    val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
+    val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
     try {
       validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
 
@@ -862,10 +862,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       testProducer.send(null, null)
       fail("Should not allow sending a null record")
     } catch {
-      case e: Throwable => {
+      case _: Throwable =>
         assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue())
         assertEquals("Interceptor should not receive metadata with an exception when record is null", 0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue())
-      }
     }
 
     // create consumer with interceptor
@@ -1222,7 +1221,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
     TestUtils.waitUntilTrue(() => {
-      val records = consumer0.poll(50)
+      consumer0.poll(50)
       consumer0.assignment() == newAssignment.asJava
     }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
 
@@ -1335,7 +1334,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                               subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
     assertTrue(consumerCount <= subscriptions.size)
     val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-    for (i <- 0 until consumerCount)
+    for (_ <- 0 until consumerCount)
       consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
     consumers ++= consumerGroup
 
@@ -1364,7 +1363,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                                    topicsToSubscribe: List[String],
                                                    subscriptions: Set[TopicPartition]): Unit = {
     assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
-    for (i <- 0 until numOfConsumersToAdd) {
+    for (_ <- 0 until numOfConsumersToAdd) {
       val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
       consumerGroup += newConsumer
       consumerPollers += subscribeConsumerAndStartPolling(newConsumer, topicsToSubscribe)
@@ -1415,7 +1414,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                                             rebalanceListener: ConsumerRebalanceListener): Unit = {
     consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
     TestUtils.waitUntilTrue(() => {
-      val records = consumer.poll(50)
+      consumer.poll(50)
       consumer.assignment() == subscriptions.asJava
     }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 734eb66..a75e7c7 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -39,7 +39,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
       createNewProducerWithNoSerializer(brokerList)
       fail("Instantiating a producer without specifying a serializer should cause a ConfigException")
     } catch {
-      case ce : ConfigException => // this is ok
+      case _ : ConfigException => // this is ok
     }
 
     // create a producer with explicit serializers should succeed
@@ -67,7 +67,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
       producer.send(record5)
       fail("Should have gotten a SerializationException")
     } catch {
-      case se: SerializationException => // this is ok
+      case _: SerializationException => // this is ok
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 5994a1d..8d676d1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -88,7 +88,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     scheduler.start
 
     // rolling bounce brokers
-    for (i <- 0 until numServers) {
+    for (_ <- 0 until numServers) {
       for (server <- servers) {
         server.shutdown()
         server.awaitShutdown()
@@ -143,7 +143,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
         futures.map(_.get)
         sent += numRecords
       } catch {
-        case e : Exception => failed = true
+        case _ : Exception => failed = true
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index 14807bc..cebfb04 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -102,7 +102,6 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
 
   private val orgName = config.getProperty(MiniKdc.OrgName)
   private val orgDomain = config.getProperty(MiniKdc.OrgDomain)
-  private val dnString = s"dc=$orgName,dc=$orgDomain"
   private val realm = s"${orgName.toUpperCase(Locale.ENGLISH)}.${orgDomain.toUpperCase(Locale.ENGLISH)}"
   private val krb5conf = new File(workDir, "krb5.conf")
 
@@ -163,7 +162,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
     val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
     partition.setId(orgName)
     partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI)
-    val dn = new Dn(dnString)
+    val dn = new Dn(s"dc=$orgName,dc=$orgDomain")
     partition.setSuffixDn(dn)
     ds.addPartition(partition)
 
@@ -207,7 +206,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
     val kerberosConfig = new KerberosConfig
     kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
     kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
-    kerberosConfig.setSearchBaseDn(dnString)
+    kerberosConfig.setSearchBaseDn(s"dc=$orgName,dc=$orgDomain")
     kerberosConfig.setPaEncTimestampRequired(false)
     kdc = new KdcServer(kerberosConfig)
     kdc.setDirectoryService(ds)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 6556100..51f02d1 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -301,7 +301,7 @@ object TestLogCleaning {
           consumedWriter.newLine()
         }
       } catch {
-        case e: ConsumerTimeoutException => 
+        case _: ConsumerTimeoutException =>
       }
     }
     consumedWriter.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 8adc7e2..f5cee0c 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -69,7 +69,6 @@ object StressTestLog {
   abstract class WorkerThread extends Thread {
     override def run() {
       try {
-        var offset = 0
         while(running.get)
           work()
       } catch {
@@ -107,7 +106,7 @@ object StressTestLog {
           case _ =>
         }
       } catch {
-        case e: OffsetOutOfRangeException => // this is okay
+        case _: OffsetOutOfRangeException => // this is okay
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestCrcPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestCrcPerformance.scala b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
index 0c1e1ad..daeecbd 100755
--- a/core/src/test/scala/other/kafka/TestCrcPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
@@ -18,7 +18,6 @@ package kafka.log
 
 import java.util.Random
 import kafka.message._
-import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Utils
 
 object TestCrcPerformance {
@@ -28,21 +27,18 @@ object TestCrcPerformance {
       Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size")
     val numMessages = args(0).toInt
     val messageSize = args(1).toInt
-    //val numMessages = 100000000
-    //val messageSize = 32
 
-    val dir = TestUtils.tempDir()
     val content = new Array[Byte](messageSize)
     new Random(1).nextBytes(content)
 
     // create message test
     val start = System.nanoTime
-    for(i <- 0 until numMessages) {
+    for (_ <- 0 until numMessages)
       new Message(content)
-    }
-    val ellapsed = System.nanoTime - start
-    println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, ellapsed/(1000.0*1000.0*1000.0),
-      ellapsed / numMessages.toDouble))
+
+    val elapsed = System.nanoTime - start
+    println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, elapsed / (1000.0*1000.0*1000.0),
+      elapsed / numMessages.toDouble))
 
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestKafkaAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
index ab807a1..72c7f28 100644
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
@@ -33,12 +33,13 @@ object TestKafkaAppender extends Logging {
     try {
       PropertyConfigurator.configure(args(0))
     } catch {
-      case e: Exception => System.err.println("KafkaAppender could not be initialized ! Exiting..")
-      e.printStackTrace()
-      System.exit(1)
+      case e: Exception =>
+        System.err.println("KafkaAppender could not be initialized ! Exiting..")
+        e.printStackTrace()
+        System.exit(1)
     }
 
-    for(i <- 1 to 10)
+    for (_ <- 1 to 10)
       info("test")    
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index db281bf..6bd8e4f 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -101,7 +101,8 @@ object TestLinearWriteSpeed {
     val rand = new Random
     rand.nextBytes(buffer.array)
     val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
-    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = (0 until numMessages).map(x => new Message(new Array[Byte](messageSize))): _*)
+    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec,
+      messages = (0 until numMessages).map(_ => new Message(new Array[Byte](messageSize))): _*)
     
     val writables = new Array[Writable](numFiles)
     val scheduler = new KafkaScheduler(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 9445191..9db2ffd 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -94,7 +94,7 @@ object TestOffsetManager {
         offset += 1
       }
       catch {
-        case e1: ClosedByInterruptException =>
+        case _: ClosedByInterruptException =>
           offsetsChannel.disconnect()
         case e2: IOException =>
           println("Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s.".format(id, offsetsChannel.host, offsetsChannel.port, groupId, e2))
@@ -158,7 +158,7 @@ object TestOffsetManager {
           }
         }
         catch {
-          case e1: ClosedByInterruptException =>
+          case _: ClosedByInterruptException =>
             channel.disconnect()
             channels.remove(coordinatorId)
           case e2: IOException =>
@@ -168,7 +168,7 @@ object TestOffsetManager {
         }
       }
       catch {
-        case e: IOException =>
+        case _: IOException =>
           println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
           metadataChannel.disconnect()
           println("Creating new query channel.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index ba89fc8..6ccac29 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -104,7 +104,7 @@ object TestPurgatoryPerformance {
     val latch = new CountDownLatch(numRequests)
     val start = System.currentTimeMillis
     val rand = new Random()
-    val keys = (0 until numKeys).map(i => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
+    val keys = (0 until numKeys).map(_ => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
     @volatile var requestArrivalTime = start
     @volatile var end = 0L
     val generator = new Runnable {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 763e4ec..0f846e1 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -69,8 +69,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
       AdminUtils.addPartitions(zkUtils, "Blah", 1)
       fail("Topic should not exist")
     } catch {
-      case e: AdminOperationException => //this is good
-      case e2: Throwable => throw e2
+      case _: AdminOperationException => //this is good
     }
   }
 
@@ -80,8 +79,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
       AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
       fail("Add partitions should fail")
     } catch {
-      case e: AdminOperationException => //this is good
-      case e2: Throwable => throw e2
+      case _: AdminOperationException => //this is good
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index ddfbb51..ff86693 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -250,7 +250,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         ConfigCommand.parseEntity(opts)
         fail("Did not fail with invalid argument list")
       } catch {
-        case e: IllegalArgumentException => // expected exception
+        case _: IllegalArgumentException => // expected exception
       }
     }
 
@@ -315,7 +315,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
           "--alter", "--add-config", "a=b,c=d")
       fail("Did not fail with invalid client-id")
     } catch {
-      case e: InvalidConfigException => // expected
+      case _: InvalidConfigException => // expected
     }
 
     checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ccb3618..d1fcbc0 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -164,7 +164,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   @Test
   def testAddPartitionDuringDeleteTopic() {
     val topic = "test"
-    val topicAndPartition = TopicAndPartition(topic, 0)
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
@@ -208,7 +207,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
       AdminUtils.deleteTopic(zkUtils, "test2")
       fail("Expected UnknownTopicOrPartitionException")
     } catch {
-      case e: UnknownTopicOrPartitionException => // expected exception
+      case _: UnknownTopicOrPartitionException => // expected exception
     }
     // verify delete topic path for test2 is removed from zookeeper
     TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
@@ -270,7 +269,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
       fail("Expected TopicAlreadyMarkedForDeletionException")
     }
     catch {
-      case e: TopicAlreadyMarkedForDeletionException => // expected exception
+      case _: TopicAlreadyMarkedForDeletionException => // expected exception
     }
 
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -300,7 +299,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
     var counter = 0
-    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
       log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
       counter += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 3691919..39bcb7a 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -87,7 +87,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
     // action/test
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupCommand.describeGroup()
+        val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.filter(_.group == group).size == 1 &&
         assignments.get.filter(_.group == group).head.consumerId.isDefined
@@ -113,7 +113,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
 
     // action/test
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupCommand.describeGroup()
+        val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.filter(_.group == group).size == 2 &&
         assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size == 1 &&

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 2a3724e..90a354e 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -45,7 +45,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging wi
     kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
 
     val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
-    val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+    val (proposedAssignment, _) = ReassignPartitionsCommand.generateAssignment(zkUtils,
       rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
 
     val assignment = proposedAssignment map { case (topicPartition, replicas) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
index fff3e7b..b71b00b 100644
--- a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
@@ -34,7 +34,7 @@ class ApiUtilsTest extends JUnitSuite {
   @Test
   def testShortStringNonASCII() {
     // Random-length strings
-    for(i <- 0 to 100) {
+    for(_ <- 0 to 100) {
       // Since we're using UTF-8 encoding, each encoded byte will be one to four bytes long 
       val s: String = ApiUtilsTest.rnd.nextString(math.abs(ApiUtilsTest.rnd.nextInt()) % (Short.MaxValue / 4))  
       val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
@@ -47,7 +47,7 @@ class ApiUtilsTest extends JUnitSuite {
   @Test
   def testShortStringASCII() {
     // Random-length strings
-    for(i <- 0 to 100) {
+    for(_ <- 0 to 100) {
       val s: String = TestUtils.randomString(math.abs(ApiUtilsTest.rnd.nextInt()) % Short.MaxValue)  
       val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
       ApiUtils.writeShortString(bb, s)
@@ -68,17 +68,13 @@ class ApiUtilsTest extends JUnitSuite {
       ApiUtils.shortStringLength(s2)
       fail
     } catch {
-      case e: KafkaException => {
-        // ok
-      }
+      case _: KafkaException => // ok
     }
     try {
       ApiUtils.writeShortString(bb, s2)
       fail
     } catch {
-      case e: KafkaException => {
-        // ok
-      }
+      case _: KafkaException => // ok
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index f8fbae7..16fe788 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -23,7 +23,6 @@ import kafka.common._
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.SystemTime
 
-import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
 
 import java.nio.ByteBuffer
@@ -37,10 +36,6 @@ import org.junit.Assert._
 object SerializationTestUtils {
   private val topic1 = "test1"
   private val topic2 = "test2"
-  private val leader1 = 0
-  private val isr1 = List(0, 1, 2)
-  private val leader2 = 0
-  private val isr2 = List(0, 2, 3)
   private val partitionDataFetchResponse0 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
   private val partitionDataFetchResponse1 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("second message".getBytes)))
   private val partitionDataFetchResponse2 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("third message".getBytes)))
@@ -84,37 +79,6 @@ object SerializationTestUtils {
   private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))),
                              new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))),
                              new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT))))
-  private val brokerEndpoints = brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
-
-  private val partitionMetaData0 = new PartitionMetadata(0, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 0)
-  private val partitionMetaData1 = new PartitionMetadata(1, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail, errorCode = 1)
-  private val partitionMetaData2 = new PartitionMetadata(2, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 2)
-  private val partitionMetaData3 = new PartitionMetadata(3, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail.tail, errorCode = 3)
-  private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
-  private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
-  private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
-
-  private val leaderAndIsr0 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
-  private val leaderAndIsr1 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
-  private val leaderAndIsr2 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
-  private val leaderAndIsr3 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
-
-  private val leaderIsrAndControllerEpoch0 = new LeaderIsrAndControllerEpoch(leaderAndIsr0, controllerEpoch = 0)
-  private val leaderIsrAndControllerEpoch1 = new LeaderIsrAndControllerEpoch(leaderAndIsr1, controllerEpoch = 0)
-  private val leaderIsrAndControllerEpoch2 = new LeaderIsrAndControllerEpoch(leaderAndIsr2, controllerEpoch = 0)
-  private val leaderIsrAndControllerEpoch3 = new LeaderIsrAndControllerEpoch(leaderAndIsr3, controllerEpoch = 0)
-
-  private val partitionStateInfo0 = new PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id).toSet)
-  private val partitionStateInfo1 = new PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id).toSet)
-  private val partitionStateInfo2 = new PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id).toSet)
-  private val partitionStateInfo3 = new PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id).toSet)
-
-  private val updateMetadataRequestPartitionStateInfo = collection.immutable.Map(
-    TopicAndPartition(topic1,0) -> partitionStateInfo0,
-    TopicAndPartition(topic1,1) -> partitionStateInfo1,
-    TopicAndPartition(topic1,2) -> partitionStateInfo2,
-    TopicAndPartition(topic1,3) -> partitionStateInfo3
-  )
 
   def createTestProducerRequest: ProducerRequest = {
     new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/ConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index 26154f2..2d20b1e 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -40,7 +40,7 @@ class ConfigTest {
         fail("Should throw InvalidClientIdException.")
       }
       catch {
-        case e: InvalidConfigException => "This is good."
+        case _: InvalidConfigException => // This is good
       }
     }
 
@@ -51,7 +51,7 @@ class ConfigTest {
         ProducerConfig.validateClientId(validClientIds(i))
       }
       catch {
-        case e: Exception => fail("Should not throw exception.")
+        case _: Exception => fail("Should not throw exception.")
       }
     }
   }
@@ -70,7 +70,7 @@ class ConfigTest {
         fail("Should throw InvalidGroupIdException.")
       }
       catch {
-        case e: InvalidConfigException => "This is good."
+        case _: InvalidConfigException => // This is good
       }
     }
 
@@ -81,7 +81,7 @@ class ConfigTest {
         ConsumerConfig.validateGroupId(validGroupIds(i))
       }
       catch {
-        case e: Exception => fail("Should not throw exception.")
+        case _: Exception => fail("Should not throw exception.")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala
index 66549af..39eb315 100644
--- a/core/src/test/scala/unit/kafka/common/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala
@@ -28,7 +28,7 @@ class TopicTest {
     val invalidTopicNames = new ArrayBuffer[String]()
     invalidTopicNames += ("", ".", "..")
     var longName = "ATCG"
-    for (i <- 1 to 6)
+    for (_ <- 1 to 6)
       longName += longName
     invalidTopicNames += longName
     invalidTopicNames += longName.drop(6)
@@ -43,7 +43,7 @@ class TopicTest {
         fail("Should throw InvalidTopicException.")
       }
       catch {
-        case e: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
+        case _: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
       }
     }
 
@@ -54,7 +54,7 @@ class TopicTest {
         Topic.validate(validTopicNames(i))
       }
       catch {
-        case e: Exception => fail("Should not throw exception.")
+        case _: Exception => fail("Should not throw exception.")
       }
     }
   }