You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/06 07:46:09 UTC

[2/3] kafka git commit: KAFKA-3771; Improving Kafka core code

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 8b523e7..1f148de 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -108,7 +108,7 @@ object StateChangeLogMerger extends Logging {
       val fileNameIndex = regex.lastIndexOf('/') + 1
       val dirName = if (fileNameIndex == 0) "." else regex.substring(0, fileNameIndex - 1)
       val fileNameRegex = new Regex(regex.substring(fileNameIndex))
-      files :::= new java.io.File(dirName).listFiles.filter(f => fileNameRegex.findFirstIn(f.getName) != None).map(dirName + "/" + _.getName).toList
+      files :::= new java.io.File(dirName).listFiles.filter(f => fileNameRegex.findFirstIn(f.getName).isDefined).map(dirName + "/" + _.getName).toList
     }
     if (options.has(topicOpt)) {
       topic = options.valueOf(topicOpt)
@@ -141,9 +141,9 @@ object StateChangeLogMerger extends Logging {
       if (!lineItr.isEmpty)
         lines ::= lineItr
     }
-    if (!lines.isEmpty) pqueue.enqueue(lines:_*)
+    if (lines.nonEmpty) pqueue.enqueue(lines:_*)
 
-    while (!pqueue.isEmpty) {
+    while (pqueue.nonEmpty) {
       val lineItr = pqueue.dequeue()
       output.write((lineItr.line + "\n").getBytes)
       val nextLineItr = getNextLine(lineItr.itr)
@@ -182,7 +182,7 @@ object StateChangeLogMerger extends Logging {
 
   class LineIterator(val line: String, val itr: Iterator[String]) {
     def this() = this("", null)
-    def isEmpty = (line == "" && itr == null)
+    def isEmpty = line == "" && itr == null
   }
 
   implicit object dateBasedOrdering extends Ordering[LineIterator] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 3077896..5f39402 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -89,7 +89,7 @@ object VerifyConsumerRebalance extends Logging {
       info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
       info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
       val partitionsWithOwners = zkUtils.getChildrenParentMayNotExist(topicDirs.consumerOwnerDir)
-      if(partitionsWithOwners.size == 0) {
+      if(partitionsWithOwners.isEmpty) {
         error("No owners for any partitions for topic " + topic)
         rebalanceSucceeded = false
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 5b6c59f..21658d3 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -258,7 +258,7 @@ object CoreUtils extends Logging {
        * Per RFC4627, section 2.5, we're not technically required to
        * encode the C1 codes, but we do to be safe.
        */
-      case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int)
+      case c if (c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f') => "\\u%04x".format(c: Int)
       case c => c
     }.mkString
   }
@@ -269,7 +269,7 @@ object CoreUtils extends Logging {
   def duplicates[T](s: Traversable[T]): Iterable[T] = {
     s.groupBy(identity)
       .map{ case (k,l) => (k,l.size)}
-      .filter{ case (k,l) => (l > 1) }
+      .filter{ case (k,l) => l > 1 }
       .keys
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/ToolsUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index fef9392..65758d8 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -29,7 +29,7 @@ object ToolsUtils {
       hostPortData =>
         org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
     }
-    val isValid = !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length
+    val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
     if(!isValid)
       CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 34cab87..f57245f 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -227,6 +227,6 @@ class VerifiableProperties(val props: Properties) extends Logging {
     }
   }
   
-  override def toString(): String = props.toString
+  override def toString: String = props.toString
  
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 1278a70..f02ab20 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -653,7 +653,7 @@ class ZkUtils(val zkClient: ZkClient,
       val topic = topicAndPartitionMap._1
       val partitionMap = topicAndPartitionMap._2
       debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
-      (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
+      topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
     }
   }
 
@@ -663,7 +663,7 @@ class ZkUtils(val zkClient: ZkClient,
     jsonPartitionMapOpt match {
       case Some(jsonPartitionMap) =>
         val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
-        reassignedPartitions.map(p => (p._1 -> new ReassignedPartitionsContext(p._2)))
+        reassignedPartitions.map(p => p._1 -> new ReassignedPartitionsContext(p._2))
       case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
     }
   }
@@ -828,9 +828,9 @@ class ZkUtils(val zkClient: ZkClient,
     val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
     if(topics == null) Set.empty[TopicAndPartition]
     else {
-      topics.map { topic =>
+      topics.flatMap { topic =>
         getChildren(getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _))
-      }.flatten.toSet
+      }.toSet
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 3d39475..891a72c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -63,25 +63,25 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testListGroups() {
-    consumers(0).subscribe(List(topic))
+    consumers.head.subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
-      consumers(0).poll(0)
-      !consumers(0).assignment().isEmpty
+      consumers.head.poll(0)
+      !consumers.head.assignment().isEmpty
     }, "Expected non-empty assignment")
 
     val groups = client.listAllGroupsFlattened
     assertFalse(groups.isEmpty)
-    val group = groups(0)
+    val group = groups.head
     assertEquals(groupId, group.groupId)
     assertEquals("consumer", group.protocolType)
   }
 
   @Test
   def testDescribeGroup() {
-    consumers(0).subscribe(List(topic))
+    consumers.head.subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
-      consumers(0).poll(0)
-      !consumers(0).assignment().isEmpty
+      consumers.head.poll(0)
+      !consumers.head.assignment().isEmpty
     }, "Expected non-empty assignment")
 
     val group = client.describeGroup(groupId)
@@ -90,7 +90,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
     assertEquals("Stable", group.state)
     assertFalse(group.members.isEmpty)
 
-    val member = group.members(0)
+    val member = group.members.head
     assertEquals(clientId, member.clientId)
     assertFalse(member.clientHost.isEmpty)
     assertFalse(member.memberId.isEmpty)
@@ -98,10 +98,10 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testDescribeConsumerGroup() {
-    consumers(0).subscribe(List(topic))
+    consumers.head.subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
-      consumers(0).poll(0)
-      !consumers(0).assignment().isEmpty
+      consumers.head.poll(0)
+      !consumers.head.assignment().isEmpty
     }, "Expected non-empty assignment")
 
     val consumerSummaries = client.describeConsumerGroup(groupId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2d5900f..10e0bae 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -413,8 +413,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
     addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
     try {
-      this.consumers(0).assign(List(topicPartition).asJava)
-      consumeRecords(this.consumers(0))
+      this.consumers.head.assign(List(topicPartition).asJava)
+      consumeRecords(this.consumers.head)
       Assert.fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
@@ -425,7 +425,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
 
     sendRecords(numRecords, topicPartition)
-    consumeRecords(this.consumers(0), topic = newTopic, part = 0)
+    consumeRecords(this.consumers.head, topic = newTopic, part = 0)
   }
 
   @Test(expected = classOf[AuthorizationException])
@@ -505,7 +505,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   @Test
   def testListOffsetsWithNoTopicAccess() {
     val e = intercept[TopicAuthorizationException] {
-      this.consumers.head.partitionsFor(topic);
+      this.consumers.head.partitionsFor(topic)
     }
     assertEquals(Set(topic), e.unauthorizedTopics().asScala)
   }
@@ -513,7 +513,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   @Test
   def testListOfsetsWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
-    this.consumers.head.partitionsFor(topic);
+    this.consumers.head.partitionsFor(topic)
   }
 
   def removeAllAcls() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 23fcfa6..ea74d5d 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -68,17 +68,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     val numRecords = 10000
     sendRecords(numRecords)
 
-    assertEquals(0, this.consumers(0).assignment.size)
-    this.consumers(0).assign(List(tp).asJava)
-    assertEquals(1, this.consumers(0).assignment.size)
+    assertEquals(0, this.consumers.head.assignment.size)
+    this.consumers.head.assign(List(tp).asJava)
+    assertEquals(1, this.consumers.head.assignment.size)
 
-    this.consumers(0).seek(tp, 0)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0)
+    this.consumers.head.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0)
 
     // check async commit callbacks
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(commitCallback)
-    awaitCommitCallback(this.consumers(0), commitCallback)
+    this.consumers.head.commitAsync(commitCallback)
+    awaitCommitCallback(this.consumers.head, commitCallback)
   }
 
   @Test
@@ -132,28 +132,28 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     sendRecords(5, tp)
     sendRecords(7, tp2)
 
-    this.consumers(0).assign(List(tp, tp2).asJava)
+    this.consumers.head.assign(List(tp, tp2).asJava)
 
     // Need to poll to join the group
-    this.consumers(0).poll(50)
-    val pos1 = this.consumers(0).position(tp)
-    val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
-    assertEquals(3, this.consumers(0).committed(tp).offset)
-    assertNull(this.consumers(0).committed(tp2))
+    this.consumers.head.poll(50)
+    val pos1 = this.consumers.head.position(tp)
+    val pos2 = this.consumers.head.position(tp2)
+    this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
+    assertEquals(3, this.consumers.head.committed(tp).offset)
+    assertNull(this.consumers.head.committed(tp2))
 
     // Positions should not change
-    assertEquals(pos1, this.consumers(0).position(tp))
-    assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
-    assertEquals(3, this.consumers(0).committed(tp).offset)
-    assertEquals(5, this.consumers(0).committed(tp2).offset)
+    assertEquals(pos1, this.consumers.head.position(tp))
+    assertEquals(pos2, this.consumers.head.position(tp2))
+    this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
+    assertEquals(3, this.consumers.head.committed(tp).offset)
+    assertEquals(5, this.consumers.head.committed(tp2).offset)
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
-    awaitCommitCallback(this.consumers(0), commitCallback)
-    assertEquals(7, this.consumers(0).committed(tp2).offset)
+    this.consumers.head.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
+    awaitCommitCallback(this.consumers.head, commitCallback)
+    assertEquals(7, this.consumers.head.committed(tp2).offset)
   }
 
   @Test
@@ -194,10 +194,10 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     while (parts == null)
       parts = consumer0.partitionsFor(TopicConstants.GROUP_METADATA_TOPIC_NAME).asScala
     assertEquals(1, parts.size)
-    assertNotNull(parts(0).leader())
+    assertNotNull(parts.head.leader())
 
     // shutdown the coordinator
-    val coordinator = parts(0).leader().id()
+    val coordinator = parts.head.leader().id()
     this.servers(coordinator).shutdown()
 
     // this should cause another callback execution
@@ -269,7 +269,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   protected def sendRecords(numRecords: Int, tp: TopicPartition) {
-    sendRecords(this.producers(0), numRecords, tp)
+    sendRecords(this.producers.head, numRecords, tp)
   }
 
   protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, tp: TopicPartition) {
@@ -416,7 +416,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
    */
   def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
                                  partitions: Set[TopicPartition]): Boolean = {
-    val allNonEmptyAssignments = assignments forall (assignment => assignment.size > 0)
+    val allNonEmptyAssignments = assignments forall (assignment => assignment.nonEmpty)
     if (!allNonEmptyAssignments) {
       // at least one consumer got empty assignment
       return false

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 0a2b49a..8eaf827 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -56,7 +56,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     super.setUp()
 
     // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024 * 1024, "")
+    consumer1 = new SimpleConsumer("localhost", servers.head.boundPort(), 100, 1024 * 1024, "")
     consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024 * 1024, "")
   }
 
@@ -298,7 +298,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       }
 
       // make sure the fetched messages also respect the partitioning and ordering
-      val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
+      val fetchResponse1 = if (leader1.get == configs.head.brokerId) {
         consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
       } else {
         consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
@@ -307,7 +307,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
 
       // TODO: also check topic and partition after they are added in the return messageSet
-      for (i <- 0 to numRecords - 1) {
+      for (i <- 0 until numRecords) {
         assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes, now, Message.MagicValue_V1), messageSet1(i).message)
         assertEquals(i.toLong, messageSet1(i).offset)
       }
@@ -386,7 +386,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
             assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
         }
       }
-      val fetchResponse = if (leader0.get == configs(0).brokerId) {
+      val fetchResponse = if (leader0.get == configs.head.brokerId) {
         consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
       } else {
         consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
@@ -423,13 +423,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
       try {
         // send message to partition 0
-        val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
+        val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer)))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         // flush the messages.
         producer.flush()
         assertTrue("All request are complete.", responses.forall(_.isDone()))
         // Check the messages received by broker.
-        val fetchResponse = if (leader.get == configs(0).brokerId) {
+        val fetchResponse = if (leader.get == configs.head.brokerId) {
           consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
         } else {
           consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
@@ -446,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   @Test
   def testSendWithInvalidCreateTime() {
     val topicProps = new Properties()
-    topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000");
+    topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
     TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
 
     val producer = createProducer(brokerList = brokerList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 8424340..c76a216 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -79,7 +79,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     this.producers.foreach(_.close)
 
     var consumed = 0L
-    val consumer = this.consumers(0)
+    val consumer = this.consumers.head
 
     consumer.subscribe(List(topic), new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
@@ -124,7 +124,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     sendRecords(numRecords)
     this.producers.foreach(_.close)
 
-    val consumer = this.consumers(0)
+    val consumer = this.consumers.head
     consumer.assign(List(tp))
     consumer.seek(tp, 0)
 
@@ -174,7 +174,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
 
   private def sendRecords(numRecords: Int) {
     val futures = (0 until numRecords).map { i =>
-      this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+      this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
     }
     futures.map(_.get)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 6e76f90..29d3bd6 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -81,7 +81,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,
-      servers(0).groupCoordinator.offsetsTopicConfigs)
+      servers.head.groupCoordinator.offsetsTopicConfigs)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b22ccde..a5a6cd6 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -79,8 +79,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.close()
 
     // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers(0).committed(tp).offset)
-    assertEquals(500, this.consumers(0).committed(tp2).offset)
+    assertEquals(300, this.consumers.head.committed(tp).offset)
+    assertEquals(500, this.consumers.head.committed(tp2).offset)
   }
 
   @Test
@@ -109,22 +109,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.close()
 
     // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers(0).committed(tp).offset)
-    assertEquals(500, this.consumers(0).committed(tp2).offset)
+    assertEquals(300, this.consumers.head.committed(tp).offset)
+    assertEquals(500, this.consumers.head.committed(tp2).offset)
   }
 
   @Test
   def testAutoOffsetReset() {
     sendRecords(1)
-    this.consumers(0).assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
+    this.consumers.head.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 1, startingOffset = 0)
   }
 
   @Test
   def testGroupConsumption() {
     sendRecords(10)
-    this.consumers(0).subscribe(List(topic).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
+    this.consumers.head.subscribe(List(topic).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 1, startingOffset = 0)
   }
 
   @Test
@@ -147,11 +147,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendRecords(1000, new TopicPartition(topic3, 0))
     sendRecords(1000, new TopicPartition(topic3, 1))
 
-    assertEquals(0, this.consumers(0).assignment().size)
+    assertEquals(0, this.consumers.head.assignment().size)
 
     val pattern = Pattern.compile("t.*c")
-    this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
-    this.consumers(0).poll(50)
+    this.consumers.head.subscribe(pattern, new TestConsumerReassignmentListener)
+    this.consumers.head.poll(50)
 
     var subscriptions = Set(
       new TopicPartition(topic, 0),
@@ -160,9 +160,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       new TopicPartition(topic1, 1))
 
     TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+      this.consumers.head.poll(50)
+      this.consumers.head.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
 
     val topic4 = "tsomec" // matches subscribed pattern
     TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
@@ -175,12 +175,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 
     TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+      this.consumers.head.poll(50)
+      this.consumers.head.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
 
-    this.consumers(0).unsubscribe()
-    assertEquals(0, this.consumers(0).assignment().size)
+    this.consumers.head.unsubscribe()
+    assertEquals(0, this.consumers.head.assignment().size)
   }
 
   @Test
@@ -193,10 +193,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendRecords(1000, new TopicPartition(topic1, 0))
     sendRecords(1000, new TopicPartition(topic1, 1))
 
-    assertEquals(0, this.consumers(0).assignment().size)
+    assertEquals(0, this.consumers.head.assignment().size)
 
-    this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
-    this.consumers(0).poll(50)
+    this.consumers.head.subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
+    this.consumers.head.poll(50)
 
     val subscriptions = Set(
       new TopicPartition(topic, 0),
@@ -205,39 +205,39 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       new TopicPartition(topic1, 1))
 
     TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+      this.consumers.head.poll(50)
+      this.consumers.head.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
 
-    this.consumers(0).unsubscribe()
-    assertEquals(0, this.consumers(0).assignment().size)
+    this.consumers.head.unsubscribe()
+    assertEquals(0, this.consumers.head.assignment().size)
   }
 
   @Test
   def testCommitMetadata() {
-    this.consumers(0).assign(List(tp).asJava)
+    this.consumers.head.assign(List(tp).asJava)
 
     // sync commit
     val syncMetadata = new OffsetAndMetadata(5, "foo")
-    this.consumers(0).commitSync(Map((tp, syncMetadata)).asJava)
-    assertEquals(syncMetadata, this.consumers(0).committed(tp))
+    this.consumers.head.commitSync(Map((tp, syncMetadata)).asJava)
+    assertEquals(syncMetadata, this.consumers.head.committed(tp))
 
     // async commit
     val asyncMetadata = new OffsetAndMetadata(10, "bar")
     val callback = new CountConsumerCommitCallback
-    this.consumers(0).commitAsync(Map((tp, asyncMetadata)).asJava, callback)
-    awaitCommitCallback(this.consumers(0), callback)
-    assertEquals(asyncMetadata, this.consumers(0).committed(tp))
+    this.consumers.head.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
+    awaitCommitCallback(this.consumers.head, callback)
+    assertEquals(asyncMetadata, this.consumers.head.committed(tp))
 
     // handle null metadata
     val nullMetadata = new OffsetAndMetadata(5, null)
-    this.consumers(0).commitSync(Map((tp, nullMetadata)).asJava)
-    assertEquals(nullMetadata, this.consumers(0).committed(tp))
+    this.consumers.head.commitSync(Map((tp, nullMetadata)).asJava)
+    assertEquals(nullMetadata, this.consumers.head.committed(tp))
   }
 
   @Test
   def testAsyncCommit() {
-    val consumer = this.consumers(0)
+    val consumer = this.consumers.head
     consumer.assign(List(tp).asJava)
     consumer.poll(0)
 
@@ -255,18 +255,18 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val otherTopic = "other"
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
     val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
-    this.consumers(0).subscribe(List(topic).asJava)
+    this.consumers.head.subscribe(List(topic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+      this.consumers.head.poll(50)
+      this.consumers.head.assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
 
     TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
-    this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+    this.consumers.head.subscribe(List(topic, otherTopic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == expandedSubscriptions.asJava
-    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+      this.consumers.head.poll(50)
+      this.consumers.head.assignment == expandedSubscriptions.asJava
+    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers.head.assignment}")
   }
 
   @Test
@@ -275,42 +275,42 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
     val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
-    this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+    this.consumers.head.subscribe(List(topic, otherTopic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+      this.consumers.head.poll(50)
+      this.consumers.head.assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
 
-    this.consumers(0).subscribe(List(topic).asJava)
+    this.consumers.head.subscribe(List(topic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == shrunkenSubscriptions.asJava
-    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+      this.consumers.head.poll(50)
+      this.consumers.head.assignment == shrunkenSubscriptions.asJava
+    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers.head.assignment}")
   }
 
   @Test
   def testPartitionsFor() {
     val numParts = 2
     TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
-    val parts = this.consumers(0).partitionsFor("part-test")
+    val parts = this.consumers.head.partitionsFor("part-test")
     assertNotNull(parts)
     assertEquals(2, parts.size)
   }
 
   @Test
   def testPartitionsForAutoCreate() {
-    val partitions = this.consumers(0).partitionsFor("non-exist-topic")
+    val partitions = this.consumers.head.partitionsFor("non-exist-topic")
     assertFalse(partitions.isEmpty)
   }
 
   @Test(expected = classOf[InvalidTopicException])
   def testPartitionsForInvalidTopic() {
-    this.consumers(0).partitionsFor(";3# ads,{234")
+    this.consumers.head.partitionsFor(";3# ads,{234")
   }
 
   @Test
   def testSeek() {
-    val consumer = this.consumers(0)
+    val consumer = this.consumers.head
     val totalRecords = 50L
     val mid = totalRecords / 2
 
@@ -366,23 +366,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testPositionAndCommit() {
     sendRecords(5)
 
-    assertNull(this.consumers(0).committed(new TopicPartition(topic, 15)))
+    assertNull(this.consumers.head.committed(new TopicPartition(topic, 15)))
 
     // position() on a partition that we aren't subscribed to throws an exception
     intercept[IllegalArgumentException] {
-      this.consumers(0).position(new TopicPartition(topic, 15))
+      this.consumers.head.position(new TopicPartition(topic, 15))
     }
 
-    this.consumers(0).assign(List(tp).asJava)
+    this.consumers.head.assign(List(tp).asJava)
 
-    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals(0L, this.consumers(0).committed(tp).offset)
+    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers.head.position(tp))
+    this.consumers.head.commitSync()
+    assertEquals(0L, this.consumers.head.committed(tp).offset)
 
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers.head.position(tp))
+    this.consumers.head.commitSync()
+    assertEquals("Committed offset should be returned", 5L, this.consumers.head.committed(tp).offset)
 
     sendRecords(1)
 
@@ -395,18 +395,18 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testPartitionPauseAndResume() {
     val partitions = List(tp).asJava
     sendRecords(5)
-    this.consumers(0).assign(partitions)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
-    this.consumers(0).pause(partitions)
+    this.consumers.head.assign(partitions)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0)
+    this.consumers.head.pause(partitions)
     sendRecords(5)
-    assertTrue(this.consumers(0).poll(0).isEmpty)
-    this.consumers(0).resume(partitions)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 5)
+    assertTrue(this.consumers.head.poll(0).isEmpty)
+    this.consumers.head.resume(partitions)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 5)
   }
 
   @Test
   def testFetchInvalidOffset() {
-    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
     consumers += consumer0
 
@@ -441,7 +441,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // produce a record that is larger than the configured fetch size
     val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
-    this.producers(0).send(record)
+    this.producers.head.send(record)
 
     // consuming a too-large record should fail
     consumer0.assign(List(tp).asJava)
@@ -713,14 +713,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val numRecords = 50
     // Test non-compressed messages
     sendRecords(numRecords, tp)
-    this.consumers(0).assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
+    this.consumers.head.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = 0)
 
     // Test compressed messages
     sendCompressedMessages(numRecords, tp2)
-    this.consumers(0).assign(List(tp2).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+    this.consumers.head.assign(List(tp2).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = 0)
   }
 
@@ -737,15 +737,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // Test non-compressed messages
     val tp1 = new TopicPartition(topicName, 0)
     sendRecords(numRecords, tp1)
-    this.consumers(0).assign(List(tp1).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
+    this.consumers.head.assign(List(tp1).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
 
     // Test compressed messages
     val tp2 = new TopicPartition(topicName, 1)
     sendCompressedMessages(numRecords, tp2)
-    this.consumers(0).assign(List(tp2).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+    this.consumers.head.assign(List(tp2).asJava)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index fc79c60..5814e94 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -32,8 +32,8 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne
   @Test
   def testMultipleBrokerMechanisms() {
 
-    val plainSaslProducer = producers(0)
-    val plainSaslConsumer = consumers(0)
+    val plainSaslProducer = producers.head
+    val plainSaslConsumer = consumers.head
 
     val gssapiSaslProperties = kafkaSaslProperties("GSSAPI")
     val gssapiSaslProducer = TestUtils.createNewProducer(brokerList,

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 2e288ec..6556100 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -311,9 +311,9 @@ object TestLogCleaning {
   
 }
 
-case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
+case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {
   def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
   def this(line: String) = this(line.split("\t"))
-  override def toString() = topic + "\t" +  key + "\t" + value + "\t" + (if(delete) "d" else "u")
+  override def toString = topic + "\t" +  key + "\t" + value + "\t" + (if(delete) "d" else "u")
   def topicAndKey = topic + key
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 5c2f1ae..9445191 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -255,7 +255,7 @@ object TestOffsetManager {
     var statsThread: StatsThread = null
     try {
       zkUtils = ZkUtils(zookeeper, 6000, 2000, false)
-      commitThreads = (0 to (threadCount-1)).map { threadId =>
+      commitThreads = (0 until threadCount).map { threadId =>
         new CommitThread(threadId, partitionCount, commitIntervalMs, zkUtils)
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ab8d363..763e4ec 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -134,7 +134,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     assertEquals(partitionDataForTopic2(2).partitionId, 2)
     val replicas = partitionDataForTopic2(1).replicas
     assertEquals(replicas.size, 2)
-    assert(replicas(0).id == 0 || replicas(0).id == 1)
+    assert(replicas.head.id == 0 || replicas.head.id == 1)
     assert(replicas(1).id == 0 || replicas(1).id == 1)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 21bb6ab..7df1411 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -127,7 +127,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
-    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> zkUtils.getReplicasForPartition(topic, p))).toMap
+    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => p -> zkUtils.getReplicasForPartition(topic, p)).toMap
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
@@ -174,9 +174,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
         ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
@@ -205,9 +205,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
         ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
@@ -236,9 +236,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
         ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index bcfcfad..7c71aed 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -63,6 +63,6 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
     val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts)
     assertEquals(1, deletedProps.size)
-    assertEquals("a", deletedProps(0))
+    assertEquals("a", deletedProps.head)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 477dcc8..1e1a98c 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -121,9 +121,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+      val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
       ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
+        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed
     }, "Partition reassignment shouldn't complete.")
     val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
@@ -223,17 +223,17 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicAndPartition.topic
 
     val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
-    brokerConfigs(0).setProperty("delete.topic.enable", "true")
-    brokerConfigs(0).setProperty("log.cleaner.enable","true")
-    brokerConfigs(0).setProperty("log.cleanup.policy","compact")
-    brokerConfigs(0).setProperty("log.segment.bytes","100")
-    brokerConfigs(0).setProperty("log.segment.delete.delay.ms","1000")
-    brokerConfigs(0).setProperty("log.cleaner.dedupe.buffer.size","1048577")
+    brokerConfigs.head.setProperty("delete.topic.enable", "true")
+    brokerConfigs.head.setProperty("log.cleaner.enable","true")
+    brokerConfigs.head.setProperty("log.cleanup.policy","compact")
+    brokerConfigs.head.setProperty("log.segment.bytes","100")
+    brokerConfigs.head.setProperty("log.segment.delete.delay.ms","1000")
+    brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
 
     val servers = createTestTopicAndCluster(topic,brokerConfigs)
 
     // for simplicity, we are validating cleaner offsets on a single broker
-    val server = servers(0)
+    val server = servers.head
     val log = server.logManager.getLog(topicAndPartition).get
 
     // write to the topic to activate cleaner

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index ac7ce51..653b40c 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -68,7 +68,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
     val messages = messageStrings.map(s => new Message(s.getBytes))
     val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new LongRef(0), messages:_*)
 
-    topicInfos(0).enqueue(messageSet)
+    topicInfos.head.enqueue(messageSet)
     assertEquals(1, queue.size)
     queue.put(ZookeeperConsumerConnector.shutdownCommand)
 
@@ -92,7 +92,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
     val messages = messageStrings.map(s => new Message(s.getBytes))
     val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(0), messages:_*)
 
-    topicInfos(0).enqueue(messageSet)
+    topicInfos.head.enqueue(messageSet)
     assertEquals(1, queue.size)
 
     val iter = new ConsumerIterator[String, String](queue,

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index a69fba1..b054794 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -430,7 +430,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   private class TestConsumerRebalanceListener extends ConsumerRebalanceListener {
     var beforeReleasingPartitionsCalled: Boolean = false
     var beforeStartingFetchersCalled: Boolean = false
-    var consumerId: String = "";
+    var consumerId: String = ""
     var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null
     var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]] = null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 91ac1f6..699715b 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -65,7 +65,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
   @Test
   def testMetadataUpdate() {
     log.setLevel(Level.INFO)
-    var controller: KafkaServer = this.servers.head;
+    var controller: KafkaServer = this.servers.head
     // Find the current controller
     val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
     for (server <- this.servers) {
@@ -121,7 +121,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     var counter = 0
     while (!found && counter < 10) {
       for (server <- this.servers) {
-        val previousEpoch = (epochMap get server.config.brokerId) match {
+        val previousEpoch = epochMap get server.config.brokerId match {
           case Some(epoch) =>
             epoch
           case None =>
@@ -130,7 +130,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
         }
 
         if (server.kafkaController.isActive
-            && (previousEpoch) < server.kafkaController.epoch) {
+            && previousEpoch < server.kafkaController.epoch) {
           controller = server
           found = true
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index beab1b5..dc343fa 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -802,7 +802,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val (error, groups) = groupCoordinator.handleListGroups()
     assertEquals(Errors.NONE, error)
     assertEquals(1, groups.size)
-    assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+    assertEquals(GroupOverview("groupId", "consumer"), groups.head)
   }
 
   @Test
@@ -814,7 +814,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val (error, groups) = groupCoordinator.handleListGroups()
     assertEquals(Errors.NONE, error)
     assertEquals(1, groups.size)
-    assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+    assertEquals(GroupOverview("groupId", "consumer"), groups.head)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 85e9cad..140f615 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -62,7 +62,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
     val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
     val request = new FetchRequest(requestInfo = partitionRequests)
     val fetched = consumer.fetch(request)
-    assertTrue(!fetched.hasError && fetched.data.size == 0)
+    assertTrue(!fetched.hasError && fetched.data.isEmpty)
   }
 
   @Test
@@ -152,7 +152,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
         response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
-        case e: OffsetOutOfRangeException => "this is good"
+        case e: OffsetOutOfRangeException => // This is good.
       }
     }
 
@@ -168,7 +168,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
         response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
-        case e: UnknownTopicOrPartitionException => "this is good"
+        case e: UnknownTopicOrPartitionException => // This is good.
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 2fdfc48..bdf116f 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -36,7 +36,7 @@ trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName)
-    consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64 * 1024, "")
+    consumer = new SimpleConsumer(host, servers.head.boundPort(), 1000000, 64 * 1024, "")
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 752a260..8212121 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -67,7 +67,7 @@ class CleanerTest extends JUnitSuite {
     while(log.numberOfSegments < 4)
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
     val keysFound = keysInLog(log)
-    assertEquals((0L until log.logEndOffset), keysFound)
+    assertEquals(0L until log.logEndOffset, keysFound)
     
     // pretend we have the following keys
     val keys = immutable.ListSet(1, 3, 5, 7, 9)
@@ -211,7 +211,7 @@ class CleanerTest extends JUnitSuite {
     // grouping by very large values should result in a single group with all the segments in it
     var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
     assertEquals(1, groups.size)
-    assertEquals(log.numberOfSegments, groups(0).size)
+    assertEquals(log.numberOfSegments, groups.head.size)
     checkSegmentOrder(groups)
     
     // grouping by very small values should result in all groups having one entry

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 534443c..417aa75 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -145,7 +145,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
    */
   @Test
   def testTruncate() {
-    val message = messageSet.toList(0)
+    val message = messageSet.toList.head
     val end = messageSet.searchFor(1, 0).position
     messageSet.truncateTo(end)
     assertEquals(List(message), messageSet.toList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index cc9873c..a862cb1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -75,7 +75,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     cleaner.awaitCleaned("log", 0, firstDirty2)
 
     val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
-    assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2);
+    assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2)
 
     val read2 = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index f290d54..7b52a09 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -107,7 +107,7 @@ class LogManagerTest {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
-      case e: OffsetOutOfRangeException => "This is good."
+      case e: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
     log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -152,7 +152,7 @@ class LogManagerTest {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
-      case e: OffsetOutOfRangeException => "This is good."
+      case e: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
     log.append(TestUtils.singleMessageSet("test".getBytes()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f48f6b1..33dd68e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -617,10 +617,10 @@ class LogTest extends JUnitSuite {
     for (i<- 1 to msgPerSeg)
       log.append(set)
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
-    assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList(0).index.maxEntries)
+    assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList.head.index.maxEntries)
     log.truncateTo(0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
-    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
+    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries)
     for (i<- 1 to msgPerSeg)
       log.append(set)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 758dad2..8f66d62 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -383,7 +383,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
   
   /* check that offsets are assigned based on byte offset from the given base offset */
   def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {
-    assertTrue("Message set should not be empty", messages.size > 0)
+    assertTrue("Message set should not be empty", messages.nonEmpty)
     var offset = baseOffset
     for(entry <- messages) {
       assertEquals("Unexpected offset in message set iterator", offset, entry.offset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 3c8a41f..5c02125 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -28,12 +28,12 @@ import org.junit.{Before, Test}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Utils
 
-case class MessageTestVal(val key: Array[Byte], 
-                          val payload: Array[Byte],
-                          val codec: CompressionCodec,
-                          val timestamp: Long,
-                          val magicValue: Byte,
-                          val message: Message)
+case class MessageTestVal(key: Array[Byte],
+                          payload: Array[Byte],
+                          codec: CompressionCodec,
+                          timestamp: Long,
+                          magicValue: Byte,
+                          message: Message)
 
 class MessageTest extends JUnitSuite {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d215430..e60f350 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.network;
+package kafka.network
 
 import java.net._
 import javax.net.ssl._

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index cf25cdb..dc73db3 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -186,11 +186,11 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     }
     assertEquals("Should have fetched 2 messages", 2, messageSet.size)
     // Message 1
-    assertTrue(ByteBuffer.wrap("test1".getBytes).equals(messageSet(0).message.payload))
-    assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet(0).message.key))
-    assertTrue(messageSet(0).message.timestamp >= startTime && messageSet(0).message.timestamp < endTime)
-    assertEquals(TimestampType.CREATE_TIME, messageSet(0).message.timestampType)
-    assertEquals(Message.MagicValue_V1, messageSet(0).message.magic)
+    assertTrue(ByteBuffer.wrap("test1".getBytes).equals(messageSet.head.message.payload))
+    assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet.head.message.key))
+    assertTrue(messageSet.head.message.timestamp >= startTime && messageSet.head.message.timestamp < endTime)
+    assertEquals(TimestampType.CREATE_TIME, messageSet.head.message.timestampType)
+    assertEquals(Message.MagicValue_V1, messageSet.head.message.magic)
 
     // Message 2
     assertTrue(ByteBuffer.wrap("test2".getBytes).equals(messageSet(1).message.payload))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 8e234d2..270a794 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -99,7 +99,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
     assertTrue(response != null)
-    assertTrue(!response.hasError && response.status.size == 0)
+    assertTrue(!response.hasError && response.status.isEmpty)
   }
 
   @Test
@@ -110,7 +110,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val producer = new SyncProducer(new SyncProducerConfig(props))
     TestUtils.createTopic(zkUtils, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
 
-    val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
+    val message1 = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
     val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1))
 
@@ -118,7 +118,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error)
     assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
 
-    val safeSize = configs(0).messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
+    val safeSize = configs.head.messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
     val message2 = new Message(new Array[Byte](safeSize))
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
     val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
@@ -142,14 +142,14 @@ class SyncProducerTest extends KafkaServerTestHarness {
 
     // This message will be dropped silently since message size too large.
     producer.send(produceRequest("test", 0,
-      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
 
     // Send another message whose size is large enough to exceed the buffer size so
     // the socket buffer will be flushed immediately;
     // this send should fail since the socket has been closed
     try {
       producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
     } catch {
       case e : java.io.IOException => // success
       case e2: Throwable => throw e2

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index bbec5b1..9203130 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -80,7 +80,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     for (path <- zkUtils.persistentZkPaths) {
       zkUtils.makeSurePersistentPathExists(path)
       if(!path.equals(ZkUtils.ConsumersPath)) {
-        val aclList = (zkUtils.zkConnection.getAcl(path)).getKey
+        val aclList = zkUtils.zkConnection.getAcl(path).getKey
         assertTrue(aclList.size == 2)
         for (acl: ACL <- aclList.asScala) {
           assertTrue(isAclSecure(acl))
@@ -207,15 +207,15 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
     for (path <- secondZk.securePersistentZkPaths) {
-      val listParent = (secondZk.zkConnection.getAcl(path)).getKey
+      val listParent = secondZk.zkConnection.getAcl(path).getKey
       assertTrue(path, isAclCorrect(listParent, secondZk.isSecure))
 
       val childPath = path + "/fpjwashere"
-      val listChild = (secondZk.zkConnection.getAcl(childPath)).getKey
+      val listChild = secondZk.zkConnection.getAcl(childPath).getKey
       assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure))
     }
     // Check consumers path.
-    val consumersAcl = (firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath)).getKey
+    val consumersAcl = firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath).getKey
     assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false))
   }
 
@@ -223,7 +223,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    * Verifies that the path has the appropriate secure ACL.
    */
   private def verify(path: String): Boolean = {
-    val list = (zkUtils.zkConnection.getAcl(path)).getKey
+    val list = zkUtils.zkConnection.getAcl(path).getKey
     list.asScala.forall(isAclSecure)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index c5b61de..591fcf7 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -78,7 +78,7 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
         val topicAndPart = TopicAndPartition(topic, partition)
         val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
         result = result && expectedOffset > 0 && brokers.forall { item =>
-          (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset)
+          expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset
         }
       }
       result

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index d1ad3a3..af979e4 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -37,7 +37,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
   @Test
   def testConfigChange() {
     assertTrue("Should contain a ConfigHandler for topics",
-               this.servers(0).dynamicConfigHandlers.contains(ConfigType.Topic))
+               this.servers.head.dynamicConfigHandlers.contains(ConfigType.Topic))
     val oldVal: java.lang.Long = 100000L
     val newVal: java.lang.Long = 200000L
     val tp = TopicAndPartition("test", 0)
@@ -45,21 +45,21 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
     AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
-      val logOpt = this.servers(0).logManager.getLog(tp)
+      val logOpt = this.servers.head.logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
       assertEquals(oldVal, logOpt.get.config.flushInterval)
     }
     logProps.put(LogConfig.FlushMessagesProp, newVal.toString)
     AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
     TestUtils.retry(10000) {
-      assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval)
+      assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
     }
   }
 
   @Test
   def testClientQuotaConfigChange() {
     assertTrue("Should contain a ConfigHandler for topics",
-               this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client))
+               this.servers.head.dynamicConfigHandlers.contains(ConfigType.Client))
     val clientId = "testClient"
     val props = new Properties()
     props.put(ClientConfigOverride.ProducerOverride, "1000")
@@ -67,8 +67,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
 
     TestUtils.retry(10000) {
-      val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
-      val quotaManagers: Map[Short, ClientQuotaManager] = servers(0).apis.quotaManagers
+      val configHandler = this.servers.head.dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
+      val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
       val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
       val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 26e2817..f5b515b 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -56,7 +56,7 @@ class HighwatermarkPersistenceTest {
     val metrics = new Metrics
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkUtils, scheduler,
-      logManagers(0), new AtomicBoolean(false))
+      logManagers.head, new AtomicBoolean(false))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -64,7 +64,7 @@ class HighwatermarkPersistenceTest {
       assertEquals(0L, fooPartition0Hw)
       val partition0 = replicaManager.getOrCreatePartition(topic, 0)
       // create leader and follower replicas
-      val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
+      val log0 = logManagers.head.createLog(TopicAndPartition(topic, 0), LogConfig())
       val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
       partition0.addReplicaIfNotExists(leaderReplicaPartition0)
       val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
@@ -99,7 +99,7 @@ class HighwatermarkPersistenceTest {
     val metrics = new Metrics
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkUtils,
-      scheduler, logManagers(0), new AtomicBoolean(false))
+      scheduler, logManagers.head, new AtomicBoolean(false))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -107,7 +107,7 @@ class HighwatermarkPersistenceTest {
       assertEquals(0L, topic1Partition0Hw)
       val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
       // create leader log
-      val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
+      val topic1Log0 = logManagers.head.createLog(TopicAndPartition(topic1, 0), LogConfig())
       // create a local replica for topic1
       val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
       topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@@ -123,7 +123,7 @@ class HighwatermarkPersistenceTest {
       // add another partition and set highwatermark
       val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
       // create leader log
-      val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
+      val topic2Log0 = logManagers.head.createLog(TopicAndPartition(topic2, 0), LogConfig())
       // create a local replica for topic2
       val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
       topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
@@ -153,7 +153,7 @@ class HighwatermarkPersistenceTest {
   }
 
   def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
-    replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L)
+    replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L)
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 89a8fd9..c34e4f0 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -126,7 +126,7 @@ class IsrExpirationTest {
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
     // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
-    for(replica <- (partition0.assignedReplicas() - leaderReplica))
+    for(replica <- partition0.assignedReplicas() - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, false))
 
     // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log.

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 7258980..3c30b6b 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -149,8 +149,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
       controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
         staleControllerEpochCallback)
-      TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true,
-        "Controller epoch should be stale")
+      TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller epoch should be stale")
       assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
     } finally {
       controllerChannelManager.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 463cd8a..0885709 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -132,7 +132,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
       val consumerOffsets =
         simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 
-      if(consumerOffsets(0) == 1) {
+      if(consumerOffsets.head == 1) {
         offsetChanged = true
       }
     }