You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/09/10 00:21:16 UTC

[1/2] KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun

Updated Branches:
  refs/heads/0.8 da4512174 -> c12d2ea9e


http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4ed88e8..df90695 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -30,7 +30,7 @@ import scala.Some
 import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
-  
+
   var logDir: File = null
   val time = new MockTime
   var config: KafkaConfig = null
@@ -46,7 +46,7 @@ class LogTest extends JUnitSuite {
   def tearDown() {
     Utils.rm(logDir)
   }
-  
+
   def createEmptyLogs(dir: File, offsets: Int*) {
     for(offset <- offsets) {
       Log.logFilename(dir, offset).createNewFile()
@@ -168,19 +168,19 @@ class LogTest extends JUnitSuite {
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1))
     assertEquals("Should be no more messages", 0, lastRead.size)
   }
-  
+
   /** Test the case where we have compressed batches of messages */
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
     val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-    
+
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
-    
+
     def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message)
-    
+
     /* we should always get the first message in the compressed set when reading any offset in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
     assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
@@ -202,7 +202,7 @@ class LogTest extends JUnitSuite {
     assertContains(makeRanges(5,8), 5)
     assertContains(makeRanges(5,8), 6)
   }
-  
+
   @Test
   def testEdgeLogRollsStartingAtZero() {
     // first test a log segment starting at 0
@@ -226,7 +226,7 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(i.toString.getBytes))
     val curOffset = log.logEndOffset
-    
+
     // time goes by; the log file is deleted
     log.markDeletedWhile(_ => true)
 
@@ -262,7 +262,7 @@ class LogTest extends JUnitSuite {
         case e:MessageSizeTooLargeException => // this is good
     }
   }
-  
+
   @Test
   def testLogRecoversToCorrectOffset() {
     val numMessages = 100
@@ -276,15 +276,15 @@ class LogTest extends JUnitSuite {
     val lastIndexOffset = log.segments.view.last.index.lastOffset
     val numIndexEntries = log.segments.view.last.index.entries
     log.close()
-    
+
     // test non-recovery case
     log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
     assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
     log.close()
-    
-    // test 
+
+    // test
     log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
     assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
@@ -305,10 +305,10 @@ class LogTest extends JUnitSuite {
 
     for (i<- 1 to msgPerSeg)
       log.append(set)
-    
+
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
-    
+
     val lastOffset = log.logEndOffset
     val size = log.size
     log.truncateTo(log.logEndOffset) // keep the entire log
@@ -326,7 +326,7 @@ class LogTest extends JUnitSuite {
 
     for (i<- 1 to msgPerSeg)
       log.append(set)
-    
+
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
     assertEquals("Should be back to original size", log.size, size)
     log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
@@ -371,14 +371,14 @@ class LogTest extends JUnitSuite {
   def testAppendWithoutOffsetAssignment() {
     for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
       logDir.mkdir()
-      var log = new Log(logDir, 
-                        maxLogFileSize = 64*1024, 
+      var log = new Log(logDir,
+                        maxLogFileSize = 64*1024,
                         maxMessageSize = config.messageMaxBytes,
-                        maxIndexSize = 1000, 
-                        indexIntervalBytes = 10000, 
+                        maxIndexSize = 1000,
+                        indexIntervalBytes = 10000,
                         needsRecovery = true)
       val messages = List("one", "two", "three", "four", "five", "six")
-      val ms = new ByteBufferMessageSet(compressionCodec = codec, 
+      val ms = new ByteBufferMessageSet(compressionCodec = codec,
                                         offsetCounter = new AtomicLong(0),
                                         messages = messages.map(s => new Message(s.getBytes)):_*)
       val firstOffset = ms.toList.head.offset
@@ -391,7 +391,7 @@ class LogTest extends JUnitSuite {
       log.delete()
     }
   }
-  
+
   /**
    * When we open a log any index segments without an associated log segment should be deleted.
    */
@@ -399,22 +399,22 @@ class LogTest extends JUnitSuite {
   def testBogusIndexSegmentsAreRemoved() {
     val bogusIndex1 = Log.indexFilename(logDir, 0)
     val bogusIndex2 = Log.indexFilename(logDir, 5)
-    
+
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val log = new Log(logDir, 
-                      maxLogFileSize = set.sizeInBytes * 5, 
+    val log = new Log(logDir,
+                      maxLogFileSize = set.sizeInBytes * 5,
                       maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000, 
-                      indexIntervalBytes = 1, 
+                      maxIndexSize = 1000,
+                      indexIntervalBytes = 1,
                       needsRecovery = false)
-    
+
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
     assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
-    
+
     // check that we can append to the log
     for(i <- 0 until 10)
       log.append(set)
-      
+
     log.delete()
   }
 
@@ -423,38 +423,38 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
 
     // create a log
-    var log = new Log(logDir, 
-                      maxLogFileSize = set.sizeInBytes * 5, 
+    var log = new Log(logDir,
+                      maxLogFileSize = set.sizeInBytes * 5,
                       maxMessageSize = config.messageMaxBytes,
-                      maxIndexSize = 1000, 
-                      indexIntervalBytes = 10000, 
+                      maxIndexSize = 1000,
+                      indexIntervalBytes = 10000,
                       needsRecovery = true)
-    
+
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for(i <- 0 until 100)
       log.append(set)
     log.close()
-    log = new Log(logDir, 
-                  maxLogFileSize = set.sizeInBytes * 5, 
+    log = new Log(logDir,
+                  maxLogFileSize = set.sizeInBytes * 5,
                   maxMessageSize = config.messageMaxBytes,
-                  maxIndexSize = 1000, 
-                  indexIntervalBytes = 10000, 
+                  maxIndexSize = 1000,
+                  indexIntervalBytes = 10000,
                   needsRecovery = true)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
   }
-  
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
-      case Some(range) => 
+      case Some(range) =>
         assertTrue(range + " does not contain " + offset, range.contains(offset))
       case None => fail("No range found, but expected to find " + offset)
     }
   }
-  
+
   class SimpleRange(val start: Long, val size: Long) extends Range
-  
+
   def makeRanges(breaks: Int*): Array[Range] = {
     val list = new ArrayList[Range]
     var prior = 0
@@ -464,5 +464,5 @@ class LogTest extends JUnitSuite {
     }
     list.toArray(new Array[Range](list.size))
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
index fe5bc09..7df7405 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
@@ -30,14 +30,15 @@ class KafkaTimerTest extends JUnit3Suite {
     val clock = new ManualClock
     val testRegistry = new MetricsRegistry(clock)
     val metric = testRegistry.newTimer(this.getClass, "TestTimer")
+    val Epsilon = java.lang.Double.longBitsToDouble(0x3ca0000000000000L)
 
     val timer = new KafkaTimer(metric)
     timer.time {
       clock.addMillis(1000)
     }
     assertEquals(1, metric.count())
-    assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
-    assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
+    assertTrue((metric.max() - 1000).abs <= Epsilon)
+    assertTrue((metric.min() - 1000).abs <= Epsilon)
   }
 
   private class ManualClock extends Clock {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 1781bc0..69c88c7 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -263,7 +263,7 @@ class AsyncProducerTest extends JUnit3Suite {
     }
     catch {
       // should not throw any exception
-      case e => fail("Should not throw any exception")
+      case e: Throwable => fail("Should not throw any exception")
 
     }
   }
@@ -450,7 +450,8 @@ class AsyncProducerTest extends JUnit3Suite {
     val topic = "topic1"
     val msgs = TestUtils.getMsgStrings(5)
     val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
-    val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData)
+    import scala.collection.JavaConversions._
+    val javaProducerData: java.util.List[KeyedMessage[String, String]] = scalaProducerData
 
     val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
     mockScalaProducer.send(scalaProducerData.head)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 29331db..2cabfbb 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -108,7 +108,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       fail("Test should fail because the broker list provided are not valid")
     } catch {
       case e: FailedToSendMessageException =>
-      case oe => fail("fails with exception", oe)
+      case oe: Throwable => fail("fails with exception", oe)
     } finally {
       producer1.close()
     }
@@ -121,7 +121,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     try{
       producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
-      case e => fail("Should succeed sending the message", e)
+      case e: Throwable => fail("Should succeed sending the message", e)
     } finally {
       producer2.close()
     }
@@ -134,7 +134,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     try{
       producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
-      case e => fail("Should succeed sending the message", e)
+      case e: Throwable => fail("Should succeed sending the message", e)
     } finally {
       producer3.close()
     }
@@ -191,7 +191,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     }
     catch {
       case se: FailedToSendMessageException => true
-      case e => fail("Not expected", e)
+      case e: Throwable => fail("Not expected", e)
     }
     finally {
       producer2.close()
@@ -225,7 +225,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       // on broker 0
       producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
-      case e => fail("Unexpected exception: " + e)
+      case e: Throwable => fail("Unexpected exception: " + e)
     }
 
     // kill the broker
@@ -238,7 +238,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       fail("Should fail since no leader exists for the partition.")
     } catch {
       case e : TestFailedException => throw e // catch and re-throw the failure message
-      case e2 => // otherwise success
+      case e2: Throwable => // otherwise success
     }
 
     // restart server 1
@@ -287,7 +287,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
       assertEquals(new Message("test".getBytes), messageSet1.next.message)
     } catch {
-      case e => case e: Exception => producer.close; fail("Not expected", e)
+      case e: Throwable => case e: Exception => producer.close; fail("Not expected", e)
     }
 
     // stop IO threads and request handling, but leave networking operational

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b3e89c3..3592bff 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -136,7 +136,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
     } catch {
       case e : java.io.IOException => // success
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -205,7 +205,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
       Assert.fail("Should have received timeout exception since request handling is stopped.")
     } catch {
       case e: SocketTimeoutException => /* success */
-      case e => Assert.fail("Unexpected exception when expecting timeout: " + e)
+      case e: Throwable => Assert.fail("Unexpected exception when expecting timeout: " + e)
     }
     val t2 = SystemTime.milliseconds
     // make sure we don't wait fewer than timeoutMs for a response

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 830608f..ee591d0 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -410,7 +410,7 @@ object TestUtils extends Logging {
           ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
             ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
-          case oe => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe)
+          case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index 3158a22..ec3cd29 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -175,7 +175,7 @@ object ConsumerPerformance {
         case _: InterruptedException =>
         case _: ClosedByInterruptException =>
         case _: ConsumerTimeoutException =>
-        case e => throw e
+        case e: Throwable => throw e
       }
       totalMessagesRead.addAndGet(messagesRead)
       totalBytesRead.addAndGet(bytesRead)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index b3858f3..2cdbc9e 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -41,7 +41,8 @@ object KafkaBuild extends Build {
   </license>
 </licenses>,
     scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"),
-    crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2"),
+    crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"),
+    excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"),
     scalaVersion := "2.8.0",
     version := "0.8.0-beta1",
     publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"),


[2/2] git commit: KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun

Posted by ne...@apache.org.
KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c12d2ea9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c12d2ea9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c12d2ea9

Branch: refs/heads/0.8
Commit: c12d2ea9e5b4bdcf9aeb07c89c69553a9f270c82
Parents: da45121
Author: Christopher Freeman <cf...@linkedin.com>
Authored: Mon Sep 9 15:20:47 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Sep 9 15:21:05 2013 -0700

----------------------------------------------------------------------
 core/build.sbt                                  |  1 +
 core/src/main/scala/kafka/Kafka.scala           |  2 +-
 .../kafka/admin/AddPartitionsCommand.scala      |  2 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  2 +-
 .../scala/kafka/admin/CreateTopicCommand.scala  |  2 +-
 .../scala/kafka/admin/DeleteTopicCommand.scala  |  2 +-
 .../scala/kafka/admin/ListTopicCommand.scala    |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala |  6 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |  4 +-
 .../main/scala/kafka/client/ClientUtils.scala   |  2 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |  2 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala  |  6 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |  4 +-
 .../scala/kafka/consumer/SimpleConsumer.scala   |  2 +-
 .../main/scala/kafka/consumer/TopicCount.scala  |  2 +-
 .../consumer/ZookeeperConsumerConnector.scala   | 10 +--
 .../consumer/ZookeeperTopicEventWatcher.scala   |  2 +-
 .../controller/ControllerChannelManager.scala   |  4 +-
 .../kafka/controller/KafkaController.scala      | 16 ++--
 .../controller/PartitionStateMachine.scala      | 20 +++--
 .../kafka/controller/ReplicaStateMachine.scala  |  4 +-
 .../main/scala/kafka/javaapi/FetchRequest.scala |  6 +-
 .../main/scala/kafka/javaapi/Implicits.scala    |  6 ++
 .../scala/kafka/javaapi/OffsetRequest.scala     |  5 +-
 .../scala/kafka/javaapi/TopicMetadata.scala     | 24 ++++--
 .../kafka/javaapi/TopicMetadataRequest.scala    |  8 +-
 .../consumer/ZookeeperConsumerConnector.scala   | 15 ++--
 .../javaapi/message/ByteBufferMessageSet.scala  |  4 +-
 .../scala/kafka/javaapi/producer/Producer.scala |  3 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  2 +-
 .../network/BoundedByteBufferReceive.scala      |  2 +-
 .../scala/kafka/producer/SyncProducer.scala     |  2 +-
 .../producer/async/DefaultEventHandler.scala    |  4 +-
 .../producer/async/ProducerSendThread.scala     |  4 +-
 .../kafka/server/AbstractFetcherThread.scala    |  6 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  8 +-
 .../kafka/server/KafkaServerStartable.scala     |  4 +-
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |  2 +-
 .../scala/kafka/tools/ImportZkOffsets.scala     |  2 +-
 core/src/main/scala/kafka/tools/JmxTool.scala   |  2 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |  4 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala |  2 +-
 .../main/scala/kafka/utils/Annotations.scala    | 36 --------
 .../scala/kafka/utils/Annotations_2.8.scala     | 36 ++++++++
 .../scala/kafka/utils/Annotations_2.9+.scala    | 38 +++++++++
 core/src/main/scala/kafka/utils/Json.scala      |  2 +-
 .../src/main/scala/kafka/utils/Mx4jLoader.scala |  2 +-
 core/src/main/scala/kafka/utils/Pool.scala      | 12 ++-
 core/src/main/scala/kafka/utils/Utils.scala     |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 24 +++---
 .../unit/kafka/admin/AddPartitionsTest.scala    |  4 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala | 12 +--
 .../ZookeeperConsumerConnectorTest.scala        |  6 +-
 .../ZookeeperConsumerConnectorTest.scala        |  5 +-
 .../message/BaseMessageSetTestCases.scala       |  7 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 88 ++++++++++----------
 .../unit/kafka/metrics/KafkaTimerTest.scala     |  5 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |  5 +-
 .../unit/kafka/producer/ProducerTest.scala      | 14 ++--
 .../unit/kafka/producer/SyncProducerTest.scala  |  4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 .../scala/kafka/perf/ConsumerPerformance.scala  |  2 +-
 project/Build.scala                             |  3 +-
 64 files changed, 302 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index c54cf44..b5bcb44 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -23,6 +23,7 @@ libraryDependencies ++= Seq(
 libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
   deps :+ (sv match {
     case "2.8.0" => "org.scalatest" %  "scalatest" % "1.2" % "test"
+    case v if v.startsWith("2.10") =>  "org.scalatest" %% "scalatest" % "1.9.1" % "test"
     case _       => "org.scalatest" %% "scalatest" % "1.8" % "test"
   })
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index dafb1ee..988014a 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -47,7 +47,7 @@ object Kafka extends Logging {
       kafkaServerStartble.awaitShutdown
     }
     catch {
-      case e => fatal(e)
+      case e: Throwable => fatal(e)
     }
     System.exit(0)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
index 5757c32..7f03708 100644
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@ -68,7 +68,7 @@ object AddPartitionsCommand extends Logging {
       addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
       println("adding partitions succeeded!")
     } catch {
-      case e =>
+      case e: Throwable =>
         println("adding partitions failed because of " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index c399bc7..d6ab275 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -90,7 +90,7 @@ object AdminUtils extends Logging {
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
       case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
-      case e2 => throw new AdministrationException(e2.toString)
+      case e2: Throwable => throw new AdministrationException(e2.toString)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index 21c1186..84c2095 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -74,7 +74,7 @@ object CreateTopicCommand extends Logging {
       createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
       println("creation succeeded!")
     } catch {
-      case e =>
+      case e: Throwable =>
         println("creation failed because of " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
index 3da4518..804b331 100644
--- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
@@ -54,7 +54,7 @@ object DeleteTopicCommand {
       println("deletion succeeded!")
     }
     catch {
-      case e =>
+      case e: Throwable =>
         println("delection failed because of " + e.getMessage)
         println(Utils.stackTrace(e))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
index c760cc0..eed49e1 100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
@@ -72,7 +72,7 @@ object ListTopicCommand {
         showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers)
     }
     catch {
-      case e =>
+      case e: Throwable =>
         println("list topic failed because of " + e.getMessage)
         println(Utils.stackTrace(e))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index d5de5f3..34ed7aa 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
       println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
     } catch {
-      case e =>
+      case e: Throwable =>
         println("Failed to start preferred replica election")
         println(Utils.stackTrace(e))
     } finally {
@@ -104,7 +104,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
         val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
         throw new AdministrationException("Preferred replica leader election currently in progress for " +
           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
-      case e2 => throw new AdministrationException(e2.toString)
+      case e2: Throwable => throw new AdministrationException(e2.toString)
     }
   }
 }
@@ -116,7 +116,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal
       val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
       PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
     } catch {
-      case e => throw new AdminCommandFailedException("Admin command failed", e)
+      case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index aa61fa1..f333d29 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -119,7 +119,7 @@ object ReassignPartitionsCommand extends Logging {
           "The replica assignment is \n" + partitionsToBeReassigned.toString())
       }
     } catch {
-      case e =>
+      case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
@@ -142,7 +142,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T
         val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
         throw new AdminCommandFailedException("Partition reassignment currently in " +
         "progress for %s. Aborting operation".format(partitionsBeingReassigned))
-      case e => error("Admin command failed", e); false
+      case e: Throwable => error("Admin command failed", e); false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index cc526ec..1d2f81b 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -54,7 +54,7 @@ object ClientUtils extends Logging{
         fetchMetaDataSucceeded = true
       }
       catch {
-        case e =>
+        case e: Throwable =>
           warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
             .format(correlationId, topics, shuffledBrokers(i).toString), e)
           t = e

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index b03dea2..9407ed2 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -42,7 +42,7 @@ private[kafka] object Broker {
           throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
       }
     } catch {
-      case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
+      case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 719beb5..48fa7a3 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -204,7 +204,7 @@ object ConsoleConsumer extends Logging {
           formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
           numMessages += 1
         } catch {
-          case e =>
+          case e: Throwable =>
             if (skipMessageOnError)
               error("Error processing message, skipping this message: ", e)
             else
@@ -220,7 +220,7 @@ object ConsoleConsumer extends Logging {
         }
       }
     } catch {
-      case e => error("Error processing message, stopping consumer: ", e)
+      case e: Throwable => error("Error processing message, stopping consumer: ", e)
     }
     System.err.println("Consumed %d messages".format(numMessages))
     System.out.flush()
@@ -247,7 +247,7 @@ object ConsoleConsumer extends Logging {
       zk.deleteRecursive(dir)
       zk.close()
     } catch {
-      case _ => // swallow
+      case _: Throwable => // swallow
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index fa6b213..8c03308 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -79,7 +79,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           }
         }
       } catch {
-        case t => {
+        case t: Throwable => {
             if (!isRunning.get())
               throw t /* If this thread is stopped, propagate this exception to kill the thread. */
             else
@@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           try {
             addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
           } catch {
-            case t => {
+            case t: Throwable => {
                 if (!isRunning.get())
                   throw t /* If this thread is stopped, propagate this exception to kill the thread. */
                 else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 4395fe3..fac64aa 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -84,7 +84,7 @@ class SimpleConsumer(val host: String,
               disconnect()
               throw ioe
           }
-        case e => throw e
+        case e: Throwable => throw e
       }
       response
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c8e8406..a3eb53e 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging {
         case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
       }
     } catch {
-      case e =>
+      case e: Throwable =>
         error("error parsing consumer json string " + topicCountString, e)
         throw e
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e7a692a..81bf0bd 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -175,7 +175,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             zkClient = null
           }
         } catch {
-          case e =>
+          case e: Throwable =>
             fatal("error during consumer connector shutdown", e)
         }
         info("ZKConsumerConnector shut down completed")
@@ -332,7 +332,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             if (doRebalance)
               syncedRebalance
           } catch {
-            case t => error("error during syncedRebalance", t)
+            case t: Throwable => error("error during syncedRebalance", t)
           }
         }
         info("stopping watcher executor thread for consumer " + consumerIdString)
@@ -384,7 +384,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               cluster = getCluster(zkClient)
               done = rebalance(cluster)
             } catch {
-              case e =>
+              case e: Throwable =>
                 /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
                  * For example, a ZK node can disappear between the time we get all children and the time we try to get
                  * the value of a child. Just let this go since another rebalance will be triggered.
@@ -461,7 +461,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             " for topic " + topic + " with consumers: " + curConsumers)
 
           for (consumerThreadId <- consumerThreadIdSet) {
-            val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+            val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
             assert(myConsumerPosition >= 0)
             val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
             val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
@@ -581,7 +581,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             // The node hasn't been deleted by the original owner. So wait a bit and retry.
             info("waiting for the partition ownership to be deleted: " + partition)
             false
-          case e2 => throw e2
+          case e2: Throwable => throw e2
         }
       }
       val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index df83baa..a67c193 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
           }
         }
         catch {
-          case e =>
+          case e: Throwable =>
             error("error in handling child changes", e)
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ed1ce0b..beca460 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -93,7 +93,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
       brokerStateInfo(brokerId).requestSendThread.shutdown()
       brokerStateInfo.remove(brokerId)
     }catch {
-      case e => error("Error while removing broker by the controller", e)
+      case e: Throwable => error("Error while removing broker by the controller", e)
     }
   }
 
@@ -142,7 +142,7 @@ class RequestSendThread(val controllerId: Int,
         }
       }
     } catch {
-      case e =>
+      case e: Throwable =>
         warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
         // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
         channel.disconnect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ab18b7a..aef41ad 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -89,14 +89,14 @@ object KafkaController extends Logging {
         case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
       }
     } catch {
-      case t =>
+      case t: Throwable =>
         // It may be due to an incompatible controller register version
         warn("Failed to parse the controller info as json. "
           + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
         try {
           return controllerInfoString.toInt
         } catch {
-          case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+          case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
         }
     }
   }
@@ -436,7 +436,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
           .format(topicAndPartition))
       }
     } catch {
-      case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
+      case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
       // remove the partition from the admin path to unblock the admin client
       removePartitionFromReassignedPartitions(topicAndPartition)
     }
@@ -448,7 +448,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
       partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
     } catch {
-      case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
       removePartitionsFromPreferredReplicaElection(partitions)
     }
@@ -514,9 +514,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         } catch {
           case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
             "Aborting controller startup procedure")
-          case oe => error("Error while incrementing controller epoch", oe)
+          case oe: Throwable => error("Error while incrementing controller epoch", oe)
         }
-      case oe => error("Error while incrementing controller epoch", oe)
+      case oe: Throwable => error("Error while incrementing controller epoch", oe)
 
     }
     info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
@@ -693,7 +693,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
       case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
-      case e2 => throw new KafkaException(e2.toString)
+      case e2: Throwable => throw new KafkaException(e2.toString)
     }
   }
 
@@ -905,7 +905,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
         }
       }
     }catch {
-      case e => error("Error while handling partition reassignment", e)
+      case e: Throwable => error("Error while handling partition reassignment", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a084830..829163a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,7 +17,8 @@
 package kafka.controller
 
 import collection._
-import collection.JavaConversions._
+import collection.JavaConversions
+import collection.mutable.Buffer
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
@@ -91,7 +92,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     } catch {
-      case e => error("Error while moving some partitions to the online state", e)
+      case e: Throwable => error("Error while moving some partitions to the online state", e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
     }
   }
@@ -111,7 +112,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     }catch {
-      case e => error("Error while moving some partitions to %s state".format(targetState), e)
+      case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
     }
   }
@@ -321,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     } catch {
       case lenne: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
-      case sce =>
+      case sce: Throwable =>
         val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
         stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
         throw new StateChangeFailedException(failMsg, sce)
@@ -359,8 +360,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       controllerContext.controllerLock synchronized {
         if (hasStarted.get) {
           try {
-            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
-            val currentChildren = JavaConversions.asBuffer(children).toSet
+            val currentChildren = {
+              import JavaConversions._
+              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
+              (children: Buffer[String]).toSet
+            }
             val newTopics = currentChildren -- controllerContext.allTopics
             val deletedTopics = controllerContext.allTopics -- currentChildren
             //        val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
@@ -375,7 +379,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             if(newTopics.size > 0)
               controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
           } catch {
-            case e => error("Error while handling new topic", e )
+            case e: Throwable => error("Error while handling new topic", e )
           }
           // TODO: kafka-330  Handle deleted topics
         }
@@ -399,7 +403,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
           controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
         } catch {
-          case e => error("Error while handling add partitions for data path " + dataPath, e )
+          case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c964857..212c05d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     }catch {
-      case e => error("Error while moving some replicas to %s state".format(targetState), e)
+      case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
     }
   }
 
@@ -273,7 +273,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               if(deadBrokerIds.size > 0)
                 controller.onBrokerFailure(deadBrokerIds.toSeq)
             } catch {
-              case e => error("Error while handling broker changes", e)
+              case e: Throwable => error("Error while handling broker changes", e)
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index b475240..6abdc17 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -17,10 +17,10 @@
 
 package kafka.javaapi
 
-import scala.collection.JavaConversions
 import java.nio.ByteBuffer
 import kafka.common.TopicAndPartition
 import kafka.api.{Request, PartitionFetchInfo}
+import scala.collection.mutable
 
 class FetchRequest(correlationId: Int,
                    clientId: String,
@@ -28,8 +28,10 @@ class FetchRequest(correlationId: Int,
                    minBytes: Int,
                    requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
 
+  import scala.collection.JavaConversions._
+
   val underlying = {
-    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap
     kafka.api.FetchRequest(
       correlationId = correlationId,
       clientId = clientId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index ee0a71d..0af3a67 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -40,4 +40,10 @@ private[javaapi] object Implicits extends Logging {
       case None => null.asInstanceOf[T]
     }
   }
+
+  // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors
+  implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = {
+    import scala.collection.JavaConversions._
+    l: collection.mutable.Buffer[A]
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 1c77ff8..d88c7e4 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -19,7 +19,7 @@ package kafka.javaapi
 
 import kafka.common.TopicAndPartition
 import kafka.api.{Request, PartitionOffsetRequestInfo}
-import collection.JavaConversions
+import scala.collection.mutable
 import java.nio.ByteBuffer
 
 
@@ -28,7 +28,8 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
                     clientId: String) {
 
   val underlying = {
-    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    import collection.JavaConversions._
+    val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap
     kafka.api.OffsetRequest(
       requestInfo = scalaMap,
       versionId = versionId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index 97b6dcd..d08c3f4 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,16 +17,20 @@
 package kafka.javaapi
 
 import kafka.cluster.Broker
-import scala.collection.JavaConversions.asList
+import scala.collection.JavaConversions
 
 private[javaapi] object MetadataListImplicits {
   implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
-  java.util.List[kafka.javaapi.TopicMetadata] =
-    asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)))
+  java.util.List[kafka.javaapi.TopicMetadata] = {
+    import JavaConversions._
+    topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
+  }
 
   implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
-  java.util.List[kafka.javaapi.PartitionMetadata] =
-    asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)))
+  java.util.List[kafka.javaapi.PartitionMetadata] = {
+    import JavaConversions._
+    partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
+  }
 }
 
 class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@@ -51,9 +55,15 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
     underlying.leader
   }
 
-  def replicas: java.util.List[Broker] = asList(underlying.replicas)
+  def replicas: java.util.List[Broker] = {
+    import JavaConversions._
+    underlying.replicas
+  }
 
-  def isr: java.util.List[Broker] = asList(underlying.isr)
+  def isr: java.util.List[Broker] = {
+    import JavaConversions._
+    underlying.isr
+  }
 
   def errorCode: Short = underlying.errorCode
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 5f80df7..05757a1 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -18,7 +18,7 @@ package kafka.javaapi
 
 import kafka.api._
 import java.nio.ByteBuffer
-import scala.collection.JavaConversions
+import scala.collection.mutable
 
 class TopicMetadataRequest(val versionId: Short,
                            override val correlationId: Int,
@@ -26,8 +26,10 @@ class TopicMetadataRequest(val versionId: Short,
                            val topics: java.util.List[String])
     extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
 
-  val underlying: kafka.api.TopicMetadataRequest =
-    new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
+  val underlying: kafka.api.TopicMetadataRequest = {
+    import scala.collection.JavaConversions._
+    new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String])
+  }
 
   def this(topics: java.util.List[String]) =
     this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 14c4c8a..58e83f6 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -18,7 +18,8 @@ package kafka.javaapi.consumer
 
 import kafka.serializer._
 import kafka.consumer._
-import scala.collection.JavaConversions.asList
+import scala.collection.mutable
+import scala.collection.JavaConversions
 
 
 /**
@@ -71,9 +72,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         keyDecoder: Decoder[K],
         valueDecoder: Decoder[V])
       : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
-    import scala.collection.JavaConversions._
 
-    val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
+    val scalaTopicCountMap: Map[String, Int] = {
+      import JavaConversions._
+      Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
+    }
     val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
     val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
     for ((topic, streams) <- scalaReturn) {
@@ -88,8 +91,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
     createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
     
-  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
-    asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder))
+  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = {
+    import JavaConversions._
+    underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
+  }
 
   def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = 
     createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 0a95248..fecee8d 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -20,12 +20,14 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.reflect.BeanProperty
 import java.nio.ByteBuffer
 import kafka.message._
+import kafka.javaapi.Implicits.javaListToScalaBuffer
 
 class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet {
   private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
   
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer)
+    // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly
+    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer)
   }
 
   def this(messages: java.util.List[Message]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 7265328..c465da5 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -19,6 +19,7 @@ package kafka.javaapi.producer
 
 import kafka.producer.ProducerConfig
 import kafka.producer.KeyedMessage
+import scala.collection.mutable
 
 class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
 {
@@ -38,7 +39,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
    */
   def send(messages: java.util.List[KeyedMessage[K,V]]) {
     import collection.JavaConversions._
-    underlying.send(asBuffer(messages):_*)
+    underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4771d11..739e22a 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -318,7 +318,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       } catch {
-        case e =>
+        case e: Throwable =>
           error("Error flushing topic " + log.topicName, e)
           e match {
             case _: IOException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
index cab1864..a442545 100644
--- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
@@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive
       case e: OutOfMemoryError =>
         error("OOME with size " + size, e)
         throw e
-      case e2 =>
+      case e2: Throwable =>
         throw e2
     }
     buffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 306f200..419156e 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
           // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
           disconnect()
           throw e
-        case e => throw e
+        case e: Throwable => throw e
       }
       response
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 2e36d3b..c151032 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -129,7 +129,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         else
           serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
       } catch {
-        case t =>
+        case t: Throwable =>
           producerStats.serializationErrorRate.mark()
           if (isSync) {
             throw t
@@ -178,7 +178,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
       case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
       case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
-      case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
+      case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2b41a49..42e9c74 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String,
     try {
       processEvents
     }catch {
-      case e => error("Error in sending events: ", e)
+      case e: Throwable => error("Error in sending events: ", e)
     }finally {
       shutdownLatch.countDown
     }
@@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String,
       if(size > 0)
         handler.handle(events)
     }catch {
-      case e => error("Error in handling batch of " + size + " events", e)
+      case e: Throwable => error("Error in handling batch of " + size + " events", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index d5addb3..a5fc96d 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -95,7 +95,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
       trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
       response = simpleConsumer.fetch(fetchRequest)
     } catch {
-      case t =>
+      case t: Throwable =>
         if (isRunning.get) {
           warn("Error in fetch %s".format(fetchRequest), t)
           partitionMapLock synchronized {
@@ -136,7 +136,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       //    should get fixed in the subsequent fetches
                       logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
-                    case e =>
+                    case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                                                .format(topic, partitionId, currentOffset.get), e)
                   }
@@ -147,7 +147,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                     warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                       .format(currentOffset.get, topic, partitionId, newOffset))
                   } catch {
-                    case e =>
+                    case e: Throwable =>
                       warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
                   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cd02aab..4679e18 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
                producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
           new ProduceResult(topicAndPartition, nle)
-        case e =>
+        case e: Throwable =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
           error("Error processing ProducerRequest with correlation id %d from client %s on partition %s"
@@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
                 fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
               new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
-            case t =>
+            case t: Throwable =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
               error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d"
@@ -430,7 +430,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
-        case e =>
+        case e: Throwable =>
           warn("Error while responding to offset request", e)
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
       }
@@ -481,7 +481,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                     isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
                 new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
               } catch {
-                case e =>
+                case e: Throwable =>
                   error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
                   new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
                     ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 5be65e9..acda52b 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
       server.startup()
     }
     catch {
-      case e =>
+      case e: Throwable =>
         fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
         shutdown()
         System.exit(1)
@@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
       server.shutdown()
     }
     catch {
-      case e =>
+      case e: Throwable =>
         fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
         System.exit(1)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f551243..03ba60e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -223,7 +223,7 @@ class ReplicaManager(val config: KafkaConfig,
             makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
                          leaderAndISRRequest.correlationId)
         } catch {
-          case e =>
+          case e: Throwable =>
             val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " +
                             "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
                                                                 leaderAndISRRequest.controllerEpoch, topicAndPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index f1f0625..33b7360 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -72,7 +72,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
           }
           if (leaderId != -1)
             debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-        case e2 =>
+        case e2: Throwable =>
           error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
           leaderId = -1
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 55709b5..c8023ee 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -102,7 +102,7 @@ object ImportZkOffsets extends Logging {
       try {
         ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
       } catch {
-        case e => e.printStackTrace()
+        case e: Throwable => e.printStackTrace()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 7e424e7..747a675 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -86,7 +86,7 @@ object JmxTool extends Logging {
       else
         List(null)
 
-    val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
+    val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
     val allAttributes: Iterable[(ObjectName, Array[String])] =
       names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 6fb545a..f0f871c 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -129,7 +129,7 @@ object MirrorMaker extends Logging {
     try {
       streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
     } catch {
-      case t =>
+      case t: Throwable =>
         fatal("Unable to create stream - shutting down mirror maker.")
         connectors.foreach(_.shutdown)
     }
@@ -204,7 +204,7 @@ object MirrorMaker extends Logging {
           }
         }
       } catch {
-        case e =>
+        case e: Throwable =>
           fatal("Stream unexpectedly exited.", e)
       } finally {
         shutdownLatch.countDown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 3cfa384..c889835 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging {
                 formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
                 numMessagesConsumed += 1
               } catch {
-                case e =>
+                case e: Throwable =>
                   if (skipMessageOnError)
                     error("Error processing message, skipping this message: ", e)
                   else

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala
deleted file mode 100644
index 28269eb..0000000
--- a/core/src/main/scala/kafka/utils/Annotations.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-/* Some helpful annotations */
-
-/**
- * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation 
- * must respect
- */
-class threadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is not threadsafe
- */
-class nonthreadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is immutable
- */
-class immutable extends StaticAnnotation

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations_2.8.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations_2.8.scala b/core/src/main/scala/kafka/utils/Annotations_2.8.scala
new file mode 100644
index 0000000..28269eb
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Annotations_2.8.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation 
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
new file mode 100644
index 0000000..ab95ce1
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import scala.annotation.StaticAnnotation
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation 
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index f80b2cc..03fb06f 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -32,7 +32,7 @@ object Json extends Logging {
       try {
         JSON.parseFull(input)
       } catch {
-        case t =>
+        case t: Throwable =>
           throw new KafkaException("Can't parse json string: %s".format(input), t)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index 64d84cc..db9f20b 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -64,7 +64,7 @@ object Mx4jLoader extends Logging {
 	  case e: ClassNotFoundException => {
         info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
       }
-      case e => {
+      case e: Throwable => {
         warn("Could not start register mbean in JMX", e);
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 9a86eab..9ddcde7 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -19,6 +19,7 @@ package kafka.utils
 
 import java.util.ArrayList
 import java.util.concurrent._
+import collection.mutable
 import collection.JavaConversions
 import kafka.common.KafkaException
 import java.lang.Object
@@ -71,10 +72,15 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
   
   def remove(key: K): V = pool.remove(key)
   
-  def keys = JavaConversions.asSet(pool.keySet())
+  def keys: mutable.Set[K] = {
+    import JavaConversions._
+    pool.keySet()
+  }
   
-  def values: Iterable[V] = 
-    JavaConversions.asIterable(new ArrayList[V](pool.values()))
+  def values: Iterable[V] = {
+    import JavaConversions._
+    new ArrayList[V](pool.values())
+  }
   
   def clear() { pool.clear() }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index e83eb5f..e0a5a27 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -67,7 +67,7 @@ object Utils extends Logging {
           fun()
         }
         catch {
-          case t =>
+          case t: Throwable =>
             // log any error and the stack trace
             error("error in loggedRunnable", t)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ca1ce12..6eede1b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -271,7 +271,7 @@ object ZkUtils extends Logging {
           storedData = readData(client, path)._1
         } catch {
           case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
-          case e2 => throw e2
+          case e2: Throwable => throw e2
         }
         if (storedData == null || storedData != data) {
           info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -281,7 +281,7 @@ object ZkUtils extends Logging {
           info(path + " exists with value " + data + " during connection loss; this is ok")
         }
       }
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -321,7 +321,7 @@ object ZkUtils extends Logging {
             case None => // the node disappeared; retry creating the ephemeral node immediately
           }
         }
-        case e2 => throw e2
+        case e2: Throwable => throw e2
       }
     }
   }
@@ -360,10 +360,10 @@ object ZkUtils extends Logging {
         } catch {
           case e: ZkNodeExistsException =>
             client.writeData(path, data)
-          case e2 => throw e2
+          case e2: Throwable => throw e2
         }
       }
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -416,7 +416,7 @@ object ZkUtils extends Logging {
         createParentPath(client, path)
         client.createEphemeral(path, data)
       }
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
   
@@ -428,7 +428,7 @@ object ZkUtils extends Logging {
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
         false
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -439,7 +439,7 @@ object ZkUtils extends Logging {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
   
@@ -449,7 +449,7 @@ object ZkUtils extends Logging {
       zk.deleteRecursive(dir)
       zk.close()
     } catch {
-      case _ => // swallow
+      case _: Throwable => // swallow
     }
   }
 
@@ -466,7 +466,7 @@ object ZkUtils extends Logging {
                       } catch {
                         case e: ZkNoNodeException =>
                           (None, stat)
-                        case e2 => throw e2
+                        case e2: Throwable => throw e2
                       }
     dataAndStat
   }
@@ -484,7 +484,7 @@ object ZkUtils extends Logging {
       client.getChildren(path)
     } catch {
       case e: ZkNoNodeException => return Nil
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -675,7 +675,7 @@ object ZkUtils extends Logging {
           case nne: ZkNoNodeException =>
             ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
             debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
-          case e2 => throw new AdministrationException(e2.toString)
+          case e2: Throwable => throw new AdministrationException(e2.toString)
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 06be990..2436289 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -104,7 +104,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
       fail("Topic should not exist")
     } catch {
       case e: AdministrationException => //this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -114,7 +114,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
       fail("Add partitions should fail")
     } catch {
       case e: AdministrationException => //this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index dc0013f..881e69b 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -38,7 +38,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
     catch {
       case e: AdministrationException => // this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
 
     // test wrong replication factor
@@ -48,7 +48,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
     catch {
       case e: AdministrationException => // this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
 
     // correct assignment
@@ -84,7 +84,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
     catch {
       case e: AdministrationException => // this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
 
     // non-exist brokers
@@ -95,7 +95,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
     catch {
       case e: AdministrationException => // this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
 
     // inconsistent replication factor
@@ -106,7 +106,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
     catch {
       case e: AdministrationException => // this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
 
     // good assignment
@@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       fail("shouldn't be able to create a topic already exists")
     } catch {
       case e: TopicExistsException => // this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index fcfc583..268d14e 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -83,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
         fail("should get an exception")
       } catch {
         case e: ConsumerTimeoutException => // this is ok
-        case e => throw e
+        case e: Throwable => throw e
       }
     }
 
@@ -406,10 +406,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   }
 
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
-    import scala.collection.JavaConversions
+    import scala.collection.JavaConversions._
     val children = zkClient.getChildren(path)
     Collections.sort(children)
-    val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
+    val childrenAsSeq : Seq[java.lang.String] = (children: mutable.Buffer[String]).toSeq
     childrenAsSeq.map(partition =>
       (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 9f243f0..e8e454f 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -85,7 +85,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
       messages ++= ms
       import scala.collection.JavaConversions._
-      javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _))))
+      javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
     }
     javaProducer.close
     messages
@@ -103,7 +103,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   def getMessages(nMessagesPerThread: Int, 
                   jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
     var messages: List[String] = Nil
-    val topicMessageStreams = asMap(jTopicMessageStreams)
+    import scala.collection.JavaConversions._
+    val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator

http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index abee11b..726399e 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -29,8 +29,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
   def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
     import scala.collection.JavaConversions._
-    val messages = asIterable(messageSet)
-    messages.map(m => m.message).iterator
+    messageSet.map(m => m.message).iterator
   }
 
   @Test
@@ -44,7 +43,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
     import scala.collection.JavaConversions._
     val m = createMessageSet(messages)
     // two iterators over the same set should give the same results
-    TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+    TestUtils.checkEquals(m.iterator, m.iterator)
   }
 
   @Test
@@ -52,7 +51,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
     import scala.collection.JavaConversions._
     val m = createMessageSet(messages, DefaultCompressionCodec)
     // two iterators over the same set should give the same results
-    TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+    TestUtils.checkEquals(m.iterator, m.iterator)
   }
 
   @Test