You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/14 00:37:31 UTC
[1/3] KAFKA-1046 Added support for Scala 2.10 builds while
maintaining compatibility with 2.8.x; reviewed by Neha and Jun
Updated Branches:
refs/heads/trunk fed901cad -> 324936609
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4ed88e8..df90695 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -30,7 +30,7 @@ import scala.Some
import kafka.server.KafkaConfig
class LogTest extends JUnitSuite {
-
+
var logDir: File = null
val time = new MockTime
var config: KafkaConfig = null
@@ -46,7 +46,7 @@ class LogTest extends JUnitSuite {
def tearDown() {
Utils.rm(logDir)
}
-
+
def createEmptyLogs(dir: File, offsets: Int*) {
for(offset <- offsets) {
Log.logFilename(dir, offset).createNewFile()
@@ -168,19 +168,19 @@ class LogTest extends JUnitSuite {
val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1))
assertEquals("Should be no more messages", 0, lastRead.size)
}
-
+
/** Test the case where we have compressed batches of messages */
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-
+
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
-
+
def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message)
-
+
/* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
@@ -202,7 +202,7 @@ class LogTest extends JUnitSuite {
assertContains(makeRanges(5,8), 5)
assertContains(makeRanges(5,8), 6)
}
-
+
@Test
def testEdgeLogRollsStartingAtZero() {
// first test a log segment starting at 0
@@ -226,7 +226,7 @@ class LogTest extends JUnitSuite {
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
val curOffset = log.logEndOffset
-
+
// time goes by; the log file is deleted
log.markDeletedWhile(_ => true)
@@ -262,7 +262,7 @@ class LogTest extends JUnitSuite {
case e:MessageSizeTooLargeException => // this is good
}
}
-
+
@Test
def testLogRecoversToCorrectOffset() {
val numMessages = 100
@@ -276,15 +276,15 @@ class LogTest extends JUnitSuite {
val lastIndexOffset = log.segments.view.last.index.lastOffset
val numIndexEntries = log.segments.view.last.index.entries
log.close()
-
+
// test non-recovery case
log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
log.close()
-
- // test
+
+ // test
log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
@@ -305,10 +305,10 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
-
+
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
-
+
val lastOffset = log.logEndOffset
val size = log.size
log.truncateTo(log.logEndOffset) // keep the entire log
@@ -326,7 +326,7 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
-
+
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size)
log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
@@ -371,14 +371,14 @@ class LogTest extends JUnitSuite {
def testAppendWithoutOffsetAssignment() {
for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
logDir.mkdir()
- var log = new Log(logDir,
- maxLogFileSize = 64*1024,
+ var log = new Log(logDir,
+ maxLogFileSize = 64*1024,
maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
needsRecovery = true)
val messages = List("one", "two", "three", "four", "five", "six")
- val ms = new ByteBufferMessageSet(compressionCodec = codec,
+ val ms = new ByteBufferMessageSet(compressionCodec = codec,
offsetCounter = new AtomicLong(0),
messages = messages.map(s => new Message(s.getBytes)):_*)
val firstOffset = ms.toList.head.offset
@@ -391,7 +391,7 @@ class LogTest extends JUnitSuite {
log.delete()
}
}
-
+
/**
* When we open a log any index segments without an associated log segment should be deleted.
*/
@@ -399,22 +399,22 @@ class LogTest extends JUnitSuite {
def testBogusIndexSegmentsAreRemoved() {
val bogusIndex1 = Log.indexFilename(logDir, 0)
val bogusIndex2 = Log.indexFilename(logDir, 5)
-
+
val set = TestUtils.singleMessageSet("test".getBytes())
- val log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ val log = new Log(logDir,
+ maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 1,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 1,
needsRecovery = false)
-
+
assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
-
+
// check that we can append to the log
for(i <- 0 until 10)
log.append(set)
-
+
log.delete()
}
@@ -423,38 +423,38 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
// create a log
- var log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ var log = new Log(logDir,
+ maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
needsRecovery = true)
-
+
// add enough messages to roll over several segments then close and re-open and attempt to truncate
for(i <- 0 until 100)
log.append(set)
log.close()
- log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ log = new Log(logDir,
+ maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
needsRecovery = true)
log.truncateTo(3)
assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
}
-
+
def assertContains(ranges: Array[Range], offset: Long) = {
Log.findRange(ranges, offset) match {
- case Some(range) =>
+ case Some(range) =>
assertTrue(range + " does not contain " + offset, range.contains(offset))
case None => fail("No range found, but expected to find " + offset)
}
}
-
+
class SimpleRange(val start: Long, val size: Long) extends Range
-
+
def makeRanges(breaks: Int*): Array[Range] = {
val list = new ArrayList[Range]
var prior = 0
@@ -464,5 +464,5 @@ class LogTest extends JUnitSuite {
}
list.toArray(new Array[Range](list.size))
}
-
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
index fe5bc09..7df7405 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
@@ -30,14 +30,15 @@ class KafkaTimerTest extends JUnit3Suite {
val clock = new ManualClock
val testRegistry = new MetricsRegistry(clock)
val metric = testRegistry.newTimer(this.getClass, "TestTimer")
+ val Epsilon = java.lang.Double.longBitsToDouble(0x3ca0000000000000L)
val timer = new KafkaTimer(metric)
timer.time {
clock.addMillis(1000)
}
assertEquals(1, metric.count())
- assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
- assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
+ assertTrue((metric.max() - 1000).abs <= Epsilon)
+ assertTrue((metric.min() - 1000).abs <= Epsilon)
}
private class ManualClock extends Clock {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 1781bc0..69c88c7 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -263,7 +263,7 @@ class AsyncProducerTest extends JUnit3Suite {
}
catch {
// should not throw any exception
- case e => fail("Should not throw any exception")
+ case e: Throwable => fail("Should not throw any exception")
}
}
@@ -450,7 +450,8 @@ class AsyncProducerTest extends JUnit3Suite {
val topic = "topic1"
val msgs = TestUtils.getMsgStrings(5)
val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
- val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData)
+ import scala.collection.JavaConversions._
+ val javaProducerData: java.util.List[KeyedMessage[String, String]] = scalaProducerData
val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
mockScalaProducer.send(scalaProducerData.head)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 29331db..2cabfbb 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -108,7 +108,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
fail("Test should fail because the broker list provided are not valid")
} catch {
case e: FailedToSendMessageException =>
- case oe => fail("fails with exception", oe)
+ case oe: Throwable => fail("fails with exception", oe)
} finally {
producer1.close()
}
@@ -121,7 +121,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try{
producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
} catch {
- case e => fail("Should succeed sending the message", e)
+ case e: Throwable => fail("Should succeed sending the message", e)
} finally {
producer2.close()
}
@@ -134,7 +134,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try{
producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
} catch {
- case e => fail("Should succeed sending the message", e)
+ case e: Throwable => fail("Should succeed sending the message", e)
} finally {
producer3.close()
}
@@ -191,7 +191,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
}
catch {
case se: FailedToSendMessageException => true
- case e => fail("Not expected", e)
+ case e: Throwable => fail("Not expected", e)
}
finally {
producer2.close()
@@ -225,7 +225,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// on broker 0
producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
} catch {
- case e => fail("Unexpected exception: " + e)
+ case e: Throwable => fail("Unexpected exception: " + e)
}
// kill the broker
@@ -238,7 +238,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
fail("Should fail since no leader exists for the partition.")
} catch {
case e : TestFailedException => throw e // catch and re-throw the failure message
- case e2 => // otherwise success
+ case e2: Throwable => // otherwise success
}
// restart server 1
@@ -287,7 +287,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message("test".getBytes), messageSet1.next.message)
} catch {
- case e => case e: Exception => producer.close; fail("Not expected", e)
+ case e: Throwable => case e: Exception => producer.close; fail("Not expected", e)
}
// stop IO threads and request handling, but leave networking operational
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b3e89c3..3592bff 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -136,7 +136,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
} catch {
case e : java.io.IOException => // success
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -205,7 +205,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.fail("Should have received timeout exception since request handling is stopped.")
} catch {
case e: SocketTimeoutException => /* success */
- case e => Assert.fail("Unexpected exception when expecting timeout: " + e)
+ case e: Throwable => Assert.fail("Unexpected exception when expecting timeout: " + e)
}
val t2 = SystemTime.milliseconds
// make sure we don't wait fewer than timeoutMs for a response
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 830608f..ee591d0 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -410,7 +410,7 @@ object TestUtils extends Logging {
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
} catch {
- case oe => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe)
+ case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index 3158a22..ec3cd29 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -175,7 +175,7 @@ object ConsumerPerformance {
case _: InterruptedException =>
case _: ClosedByInterruptException =>
case _: ConsumerTimeoutException =>
- case e => throw e
+ case e: Throwable => throw e
}
totalMessagesRead.addAndGet(messagesRead)
totalBytesRead.addAndGet(bytesRead)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index b3858f3..2cdbc9e 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -41,7 +41,8 @@ object KafkaBuild extends Build {
</license>
</licenses>,
scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"),
- crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2"),
+ crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"),
+ excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"),
scalaVersion := "2.8.0",
version := "0.8.0-beta1",
publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"),
[3/3] git commit: merge from 0.8
c12d2ea9e5b4bdcf9aeb07c89c69553a9f270c82 to trunk
Posted by ju...@apache.org.
merge from 0.8 c12d2ea9e5b4bdcf9aeb07c89c69553a9f270c82 to trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32493660
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32493660
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32493660
Branch: refs/heads/trunk
Commit: 324936609f9e087aa4168e7f7581148e34b346a4
Parents: fed901c c12d2ea
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Sep 13 15:36:53 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Sep 13 15:36:53 2013 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[2/3] git commit: KAFKA-1046 Added support for Scala 2.10 builds
while maintaining compatibility with 2.8.x; reviewed by Neha and Jun
Posted by ju...@apache.org.
KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c12d2ea9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c12d2ea9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c12d2ea9
Branch: refs/heads/trunk
Commit: c12d2ea9e5b4bdcf9aeb07c89c69553a9f270c82
Parents: da45121
Author: Christopher Freeman <cf...@linkedin.com>
Authored: Mon Sep 9 15:20:47 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Sep 9 15:21:05 2013 -0700
----------------------------------------------------------------------
core/build.sbt | 1 +
core/src/main/scala/kafka/Kafka.scala | 2 +-
.../kafka/admin/AddPartitionsCommand.scala | 2 +-
.../src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../scala/kafka/admin/CreateTopicCommand.scala | 2 +-
.../scala/kafka/admin/DeleteTopicCommand.scala | 2 +-
.../scala/kafka/admin/ListTopicCommand.scala | 2 +-
.../PreferredReplicaLeaderElectionCommand.scala | 6 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 4 +-
.../main/scala/kafka/client/ClientUtils.scala | 2 +-
core/src/main/scala/kafka/cluster/Broker.scala | 2 +-
.../scala/kafka/consumer/ConsoleConsumer.scala | 6 +-
.../kafka/consumer/ConsumerFetcherManager.scala | 4 +-
.../scala/kafka/consumer/SimpleConsumer.scala | 2 +-
.../main/scala/kafka/consumer/TopicCount.scala | 2 +-
.../consumer/ZookeeperConsumerConnector.scala | 10 +--
.../consumer/ZookeeperTopicEventWatcher.scala | 2 +-
.../controller/ControllerChannelManager.scala | 4 +-
.../kafka/controller/KafkaController.scala | 16 ++--
.../controller/PartitionStateMachine.scala | 20 +++--
.../kafka/controller/ReplicaStateMachine.scala | 4 +-
.../main/scala/kafka/javaapi/FetchRequest.scala | 6 +-
.../main/scala/kafka/javaapi/Implicits.scala | 6 ++
.../scala/kafka/javaapi/OffsetRequest.scala | 5 +-
.../scala/kafka/javaapi/TopicMetadata.scala | 24 ++++--
.../kafka/javaapi/TopicMetadataRequest.scala | 8 +-
.../consumer/ZookeeperConsumerConnector.scala | 15 ++--
.../javaapi/message/ByteBufferMessageSet.scala | 4 +-
.../scala/kafka/javaapi/producer/Producer.scala | 3 +-
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
.../network/BoundedByteBufferReceive.scala | 2 +-
.../scala/kafka/producer/SyncProducer.scala | 2 +-
.../producer/async/DefaultEventHandler.scala | 4 +-
.../producer/async/ProducerSendThread.scala | 4 +-
.../kafka/server/AbstractFetcherThread.scala | 6 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 8 +-
.../kafka/server/KafkaServerStartable.scala | 4 +-
.../scala/kafka/server/ReplicaManager.scala | 2 +-
.../kafka/server/ZookeeperLeaderElector.scala | 2 +-
.../scala/kafka/tools/ImportZkOffsets.scala | 2 +-
core/src/main/scala/kafka/tools/JmxTool.scala | 2 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 4 +-
.../scala/kafka/tools/SimpleConsumerShell.scala | 2 +-
.../main/scala/kafka/utils/Annotations.scala | 36 --------
.../scala/kafka/utils/Annotations_2.8.scala | 36 ++++++++
.../scala/kafka/utils/Annotations_2.9+.scala | 38 +++++++++
core/src/main/scala/kafka/utils/Json.scala | 2 +-
.../src/main/scala/kafka/utils/Mx4jLoader.scala | 2 +-
core/src/main/scala/kafka/utils/Pool.scala | 12 ++-
core/src/main/scala/kafka/utils/Utils.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 24 +++---
.../unit/kafka/admin/AddPartitionsTest.scala | 4 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 12 +--
.../ZookeeperConsumerConnectorTest.scala | 6 +-
.../ZookeeperConsumerConnectorTest.scala | 5 +-
.../message/BaseMessageSetTestCases.scala | 7 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 88 ++++++++++----------
.../unit/kafka/metrics/KafkaTimerTest.scala | 5 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 5 +-
.../unit/kafka/producer/ProducerTest.scala | 14 ++--
.../unit/kafka/producer/SyncProducerTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../scala/kafka/perf/ConsumerPerformance.scala | 2 +-
project/Build.scala | 3 +-
64 files changed, 302 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index c54cf44..b5bcb44 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -23,6 +23,7 @@ libraryDependencies ++= Seq(
libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
deps :+ (sv match {
case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test"
+ case v if v.startsWith("2.10") => "org.scalatest" %% "scalatest" % "1.9.1" % "test"
case _ => "org.scalatest" %% "scalatest" % "1.8" % "test"
})
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index dafb1ee..988014a 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -47,7 +47,7 @@ object Kafka extends Logging {
kafkaServerStartble.awaitShutdown
}
catch {
- case e => fatal(e)
+ case e: Throwable => fatal(e)
}
System.exit(0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
index 5757c32..7f03708 100644
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@ -68,7 +68,7 @@ object AddPartitionsCommand extends Logging {
addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!")
} catch {
- case e =>
+ case e: Throwable =>
println("adding partitions failed because of " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index c399bc7..d6ab275 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -90,7 +90,7 @@ object AdminUtils extends Logging {
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2: Throwable => throw new AdministrationException(e2.toString)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index 21c1186..84c2095 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -74,7 +74,7 @@ object CreateTopicCommand extends Logging {
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!")
} catch {
- case e =>
+ case e: Throwable =>
println("creation failed because of " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
index 3da4518..804b331 100644
--- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
@@ -54,7 +54,7 @@ object DeleteTopicCommand {
println("deletion succeeded!")
}
catch {
- case e =>
+ case e: Throwable =>
println("delection failed because of " + e.getMessage)
println(Utils.stackTrace(e))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
index c760cc0..eed49e1 100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
@@ -72,7 +72,7 @@ object ListTopicCommand {
showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers)
}
catch {
- case e =>
+ case e: Throwable =>
println("list topic failed because of " + e.getMessage)
println(Utils.stackTrace(e))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index d5de5f3..34ed7aa 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
} catch {
- case e =>
+ case e: Throwable =>
println("Failed to start preferred replica election")
println(Utils.stackTrace(e))
} finally {
@@ -104,7 +104,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
throw new AdministrationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2: Throwable => throw new AdministrationException(e2.toString)
}
}
}
@@ -116,7 +116,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal
val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
} catch {
- case e => throw new AdminCommandFailedException("Admin command failed", e)
+ case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index aa61fa1..f333d29 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -119,7 +119,7 @@ object ReassignPartitionsCommand extends Logging {
"The replica assignment is \n" + partitionsToBeReassigned.toString())
}
} catch {
- case e =>
+ case e: Throwable =>
println("Partitions reassignment failed due to " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
@@ -142,7 +142,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
throw new AdminCommandFailedException("Partition reassignment currently in " +
"progress for %s. Aborting operation".format(partitionsBeingReassigned))
- case e => error("Admin command failed", e); false
+ case e: Throwable => error("Admin command failed", e); false
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index cc526ec..1d2f81b 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -54,7 +54,7 @@ object ClientUtils extends Logging{
fetchMetaDataSucceeded = true
}
catch {
- case e =>
+ case e: Throwable =>
warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
.format(correlationId, topics, shuffledBrokers(i).toString), e)
t = e
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index b03dea2..9407ed2 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -42,7 +42,7 @@ private[kafka] object Broker {
throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
}
} catch {
- case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
+ case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 719beb5..48fa7a3 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -204,7 +204,7 @@ object ConsoleConsumer extends Logging {
formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
numMessages += 1
} catch {
- case e =>
+ case e: Throwable =>
if (skipMessageOnError)
error("Error processing message, skipping this message: ", e)
else
@@ -220,7 +220,7 @@ object ConsoleConsumer extends Logging {
}
}
} catch {
- case e => error("Error processing message, stopping consumer: ", e)
+ case e: Throwable => error("Error processing message, stopping consumer: ", e)
}
System.err.println("Consumed %d messages".format(numMessages))
System.out.flush()
@@ -247,7 +247,7 @@ object ConsoleConsumer extends Logging {
zk.deleteRecursive(dir)
zk.close()
} catch {
- case _ => // swallow
+ case _: Throwable => // swallow
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index fa6b213..8c03308 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -79,7 +79,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
}
} catch {
- case t => {
+ case t: Throwable => {
if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else
@@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try {
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
} catch {
- case t => {
+ case t: Throwable => {
if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 4395fe3..fac64aa 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -84,7 +84,7 @@ class SimpleConsumer(val host: String,
disconnect()
throw ioe
}
- case e => throw e
+ case e: Throwable => throw e
}
response
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c8e8406..a3eb53e 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging {
case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
}
} catch {
- case e =>
+ case e: Throwable =>
error("error parsing consumer json string " + topicCountString, e)
throw e
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e7a692a..81bf0bd 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -175,7 +175,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient = null
}
} catch {
- case e =>
+ case e: Throwable =>
fatal("error during consumer connector shutdown", e)
}
info("ZKConsumerConnector shut down completed")
@@ -332,7 +332,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (doRebalance)
syncedRebalance
} catch {
- case t => error("error during syncedRebalance", t)
+ case t: Throwable => error("error during syncedRebalance", t)
}
}
info("stopping watcher executor thread for consumer " + consumerIdString)
@@ -384,7 +384,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
cluster = getCluster(zkClient)
done = rebalance(cluster)
} catch {
- case e =>
+ case e: Throwable =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered.
@@ -461,7 +461,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
" for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) {
- val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+ val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
@@ -581,7 +581,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition)
false
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index df83baa..a67c193 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
}
}
catch {
- case e =>
+ case e: Throwable =>
error("error in handling child changes", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ed1ce0b..beca460 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -93,7 +93,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
brokerStateInfo(brokerId).requestSendThread.shutdown()
brokerStateInfo.remove(brokerId)
}catch {
- case e => error("Error while removing broker by the controller", e)
+ case e: Throwable => error("Error while removing broker by the controller", e)
}
}
@@ -142,7 +142,7 @@ class RequestSendThread(val controllerId: Int,
}
}
} catch {
- case e =>
+ case e: Throwable =>
warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
// If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
channel.disconnect()
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ab18b7a..aef41ad 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -89,14 +89,14 @@ object KafkaController extends Logging {
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
- case t =>
+ case t: Throwable =>
// It may be due to an incompatible controller register version
warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
try {
return controllerInfoString.toInt
} catch {
- case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+ case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
}
}
}
@@ -436,7 +436,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
.format(topicAndPartition))
}
} catch {
- case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
+ case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
// remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(topicAndPartition)
}
@@ -448,7 +448,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
} catch {
- case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+ case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
removePartitionsFromPreferredReplicaElection(partitions)
}
@@ -514,9 +514,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
} catch {
case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
"Aborting controller startup procedure")
- case oe => error("Error while incrementing controller epoch", oe)
+ case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
- case oe => error("Error while incrementing controller epoch", oe)
+ case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
@@ -693,7 +693,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
- case e2 => throw new KafkaException(e2.toString)
+ case e2: Throwable => throw new KafkaException(e2.toString)
}
}
@@ -905,7 +905,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
}
}
}catch {
- case e => error("Error while handling partition reassignment", e)
+ case e: Throwable => error("Error while handling partition reassignment", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a084830..829163a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,7 +17,8 @@
package kafka.controller
import collection._
-import collection.JavaConversions._
+import collection.JavaConversions
+import collection.mutable.Buffer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
@@ -91,7 +92,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
} catch {
- case e => error("Error while moving some partitions to the online state", e)
+ case e: Throwable => error("Error while moving some partitions to the online state", e)
// TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
}
}
@@ -111,7 +112,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
- case e => error("Error while moving some partitions to %s state".format(targetState), e)
+ case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
}
}
@@ -321,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} catch {
case lenne: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe
- case sce =>
+ case sce: Throwable =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg, sce)
@@ -359,8 +360,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controllerContext.controllerLock synchronized {
if (hasStarted.get) {
try {
- debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
- val currentChildren = JavaConversions.asBuffer(children).toSet
+ val currentChildren = {
+ import JavaConversions._
+ debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
+ (children: Buffer[String]).toSet
+ }
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
// val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
@@ -375,7 +379,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch {
- case e => error("Error while handling new topic", e )
+ case e: Throwable => error("Error while handling new topic", e )
}
// TODO: kafka-330 Handle deleted topics
}
@@ -399,7 +403,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
} catch {
- case e => error("Error while handling add partitions for data path " + dataPath, e )
+ case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c964857..212c05d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
- case e => error("Error while moving some replicas to %s state".format(targetState), e)
+ case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
@@ -273,7 +273,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq)
} catch {
- case e => error("Error while handling broker changes", e)
+ case e: Throwable => error("Error while handling broker changes", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index b475240..6abdc17 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -17,10 +17,10 @@
package kafka.javaapi
-import scala.collection.JavaConversions
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionFetchInfo}
+import scala.collection.mutable
class FetchRequest(correlationId: Int,
clientId: String,
@@ -28,8 +28,10 @@ class FetchRequest(correlationId: Int,
minBytes: Int,
requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
+ import scala.collection.JavaConversions._
+
val underlying = {
- val scalaMap = JavaConversions.asMap(requestInfo).toMap
+ val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap
kafka.api.FetchRequest(
correlationId = correlationId,
clientId = clientId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index ee0a71d..0af3a67 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -40,4 +40,10 @@ private[javaapi] object Implicits extends Logging {
case None => null.asInstanceOf[T]
}
}
+
+ // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors
+ implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = {
+ import scala.collection.JavaConversions._
+ l: collection.mutable.Buffer[A]
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 1c77ff8..d88c7e4 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -19,7 +19,7 @@ package kafka.javaapi
import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionOffsetRequestInfo}
-import collection.JavaConversions
+import scala.collection.mutable
import java.nio.ByteBuffer
@@ -28,7 +28,8 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
clientId: String) {
val underlying = {
- val scalaMap = JavaConversions.asMap(requestInfo).toMap
+ import collection.JavaConversions._
+ val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap
kafka.api.OffsetRequest(
requestInfo = scalaMap,
versionId = versionId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index 97b6dcd..d08c3f4 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,16 +17,20 @@
package kafka.javaapi
import kafka.cluster.Broker
-import scala.collection.JavaConversions.asList
+import scala.collection.JavaConversions
private[javaapi] object MetadataListImplicits {
implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
- java.util.List[kafka.javaapi.TopicMetadata] =
- asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)))
+ java.util.List[kafka.javaapi.TopicMetadata] = {
+ import JavaConversions._
+ topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
+ }
implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
- java.util.List[kafka.javaapi.PartitionMetadata] =
- asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)))
+ java.util.List[kafka.javaapi.PartitionMetadata] = {
+ import JavaConversions._
+ partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
+ }
}
class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@@ -51,9 +55,15 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
underlying.leader
}
- def replicas: java.util.List[Broker] = asList(underlying.replicas)
+ def replicas: java.util.List[Broker] = {
+ import JavaConversions._
+ underlying.replicas
+ }
- def isr: java.util.List[Broker] = asList(underlying.isr)
+ def isr: java.util.List[Broker] = {
+ import JavaConversions._
+ underlying.isr
+ }
def errorCode: Short = underlying.errorCode
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 5f80df7..05757a1 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -18,7 +18,7 @@ package kafka.javaapi
import kafka.api._
import java.nio.ByteBuffer
-import scala.collection.JavaConversions
+import scala.collection.mutable
class TopicMetadataRequest(val versionId: Short,
override val correlationId: Int,
@@ -26,8 +26,10 @@ class TopicMetadataRequest(val versionId: Short,
val topics: java.util.List[String])
extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
- val underlying: kafka.api.TopicMetadataRequest =
- new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
+ val underlying: kafka.api.TopicMetadataRequest = {
+ import scala.collection.JavaConversions._
+ new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String])
+ }
def this(topics: java.util.List[String]) =
this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 14c4c8a..58e83f6 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -18,7 +18,8 @@ package kafka.javaapi.consumer
import kafka.serializer._
import kafka.consumer._
-import scala.collection.JavaConversions.asList
+import scala.collection.mutable
+import scala.collection.JavaConversions
/**
@@ -71,9 +72,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
keyDecoder: Decoder[K],
valueDecoder: Decoder[V])
: java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
- import scala.collection.JavaConversions._
- val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
+ val scalaTopicCountMap: Map[String, Int] = {
+ import JavaConversions._
+ Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
+ }
val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
for ((topic, streams) <- scalaReturn) {
@@ -88,8 +91,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
- def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
- asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder))
+ def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = {
+ import JavaConversions._
+ underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
+ }
def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 0a95248..fecee8d 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -20,12 +20,14 @@ import java.util.concurrent.atomic.AtomicLong
import scala.reflect.BeanProperty
import java.nio.ByteBuffer
import kafka.message._
+import kafka.javaapi.Implicits.javaListToScalaBuffer
class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet {
private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
- this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer)
+ // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly
+ this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer)
}
def this(messages: java.util.List[Message]) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 7265328..c465da5 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -19,6 +19,7 @@ package kafka.javaapi.producer
import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
+import scala.collection.mutable
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
@@ -38,7 +39,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
*/
def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._
- underlying.send(asBuffer(messages):_*)
+ underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4771d11..739e22a 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -318,7 +318,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
if(timeSinceLastFlush >= logFlushInterval)
log.flush
} catch {
- case e =>
+ case e: Throwable =>
error("Error flushing topic " + log.topicName, e)
e match {
case _: IOException =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
index cab1864..a442545 100644
--- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
@@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive
case e: OutOfMemoryError =>
error("OOME with size " + size, e)
throw e
- case e2 =>
+ case e2: Throwable =>
throw e2
}
buffer
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 306f200..419156e 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
// no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
disconnect()
throw e
- case e => throw e
+ case e: Throwable => throw e
}
response
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 2e36d3b..c151032 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -129,7 +129,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
else
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
} catch {
- case t =>
+ case t: Throwable =>
producerStats.serializationErrorRate.mark()
if (isSync) {
throw t
@@ -178,7 +178,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}catch { // Swallow recoverable exceptions and return None so that they can be retried.
case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
- case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
+ case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2b41a49..42e9c74 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String,
try {
processEvents
}catch {
- case e => error("Error in sending events: ", e)
+ case e: Throwable => error("Error in sending events: ", e)
}finally {
shutdownLatch.countDown
}
@@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String,
if(size > 0)
handler.handle(events)
}catch {
- case e => error("Error in handling batch of " + size + " events", e)
+ case e: Throwable => error("Error in handling batch of " + size + " events", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index d5addb3..a5fc96d 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -95,7 +95,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
response = simpleConsumer.fetch(fetchRequest)
} catch {
- case t =>
+ case t: Throwable =>
if (isRunning.get) {
warn("Error in fetch %s".format(fetchRequest), t)
partitionMapLock synchronized {
@@ -136,7 +136,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
- case e =>
+ case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
.format(topic, partitionId, currentOffset.get), e)
}
@@ -147,7 +147,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
} catch {
- case e =>
+ case e: Throwable =>
warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
partitionsWithError += topicAndPartition
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cd02aab..4679e18 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
new ProduceResult(topicAndPartition, nle)
- case e =>
+ case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
error("Error processing ProducerRequest with correlation id %d from client %s on partition %s"
@@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- case t =>
+ case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d"
@@ -430,7 +430,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
- case e =>
+ case e: Throwable =>
warn("Error while responding to offset request", e)
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
}
@@ -481,7 +481,7 @@ class KafkaApis(val requestChannel: RequestChannel,
isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
- case e =>
+ case e: Throwable =>
error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 5be65e9..acda52b 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
server.startup()
}
catch {
- case e =>
+ case e: Throwable =>
fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
shutdown()
System.exit(1)
@@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
server.shutdown()
}
catch {
- case e =>
+ case e: Throwable =>
fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
System.exit(1)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f551243..03ba60e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -223,7 +223,7 @@ class ReplicaManager(val config: KafkaConfig,
makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
leaderAndISRRequest.correlationId)
} catch {
- case e =>
+ case e: Throwable =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " +
"epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.controllerEpoch, topicAndPartition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index f1f0625..33b7360 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -72,7 +72,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
}
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
- case e2 =>
+ case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
leaderId = -1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 55709b5..c8023ee 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -102,7 +102,7 @@ object ImportZkOffsets extends Logging {
try {
ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
} catch {
- case e => e.printStackTrace()
+ case e: Throwable => e.printStackTrace()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 7e424e7..747a675 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -86,7 +86,7 @@ object JmxTool extends Logging {
else
List(null)
- val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
+ val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
val allAttributes: Iterable[(ObjectName, Array[String])] =
names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 6fb545a..f0f871c 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -129,7 +129,7 @@ object MirrorMaker extends Logging {
try {
streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
} catch {
- case t =>
+ case t: Throwable =>
fatal("Unable to create stream - shutting down mirror maker.")
connectors.foreach(_.shutdown)
}
@@ -204,7 +204,7 @@ object MirrorMaker extends Logging {
}
}
} catch {
- case e =>
+ case e: Throwable =>
fatal("Stream unexpectedly exited.", e)
} finally {
shutdownLatch.countDown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 3cfa384..c889835 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging {
formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
numMessagesConsumed += 1
} catch {
- case e =>
+ case e: Throwable =>
if (skipMessageOnError)
error("Error processing message, skipping this message: ", e)
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala
deleted file mode 100644
index 28269eb..0000000
--- a/core/src/main/scala/kafka/utils/Annotations.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-/* Some helpful annotations */
-
-/**
- * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
- * must respect
- */
-class threadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is not threadsafe
- */
-class nonthreadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is immutable
- */
-class immutable extends StaticAnnotation
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations_2.8.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations_2.8.scala b/core/src/main/scala/kafka/utils/Annotations_2.8.scala
new file mode 100644
index 0000000..28269eb
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Annotations_2.8.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
new file mode 100644
index 0000000..ab95ce1
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import scala.annotation.StaticAnnotation
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index f80b2cc..03fb06f 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -32,7 +32,7 @@ object Json extends Logging {
try {
JSON.parseFull(input)
} catch {
- case t =>
+ case t: Throwable =>
throw new KafkaException("Can't parse json string: %s".format(input), t)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index 64d84cc..db9f20b 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -64,7 +64,7 @@ object Mx4jLoader extends Logging {
case e: ClassNotFoundException => {
info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
}
- case e => {
+ case e: Throwable => {
warn("Could not start register mbean in JMX", e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 9a86eab..9ddcde7 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -19,6 +19,7 @@ package kafka.utils
import java.util.ArrayList
import java.util.concurrent._
+import collection.mutable
import collection.JavaConversions
import kafka.common.KafkaException
import java.lang.Object
@@ -71,10 +72,15 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
def remove(key: K): V = pool.remove(key)
- def keys = JavaConversions.asSet(pool.keySet())
+ def keys: mutable.Set[K] = {
+ import JavaConversions._
+ pool.keySet()
+ }
- def values: Iterable[V] =
- JavaConversions.asIterable(new ArrayList[V](pool.values()))
+ def values: Iterable[V] = {
+ import JavaConversions._
+ new ArrayList[V](pool.values())
+ }
def clear() { pool.clear() }
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index e83eb5f..e0a5a27 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -67,7 +67,7 @@ object Utils extends Logging {
fun()
}
catch {
- case t =>
+ case t: Throwable =>
// log any error and the stack trace
error("error in loggedRunnable", t)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ca1ce12..6eede1b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -271,7 +271,7 @@ object ZkUtils extends Logging {
storedData = readData(client, path)._1
} catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -281,7 +281,7 @@ object ZkUtils extends Logging {
info(path + " exists with value " + data + " during connection loss; this is ok")
}
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -321,7 +321,7 @@ object ZkUtils extends Logging {
case None => // the node disappeared; retry creating the ephemeral node immediately
}
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
}
@@ -360,10 +360,10 @@ object ZkUtils extends Logging {
} catch {
case e: ZkNodeExistsException =>
client.writeData(path, data)
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -416,7 +416,7 @@ object ZkUtils extends Logging {
createParentPath(client, path)
client.createEphemeral(path, data)
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -428,7 +428,7 @@ object ZkUtils extends Logging {
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
false
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -439,7 +439,7 @@ object ZkUtils extends Logging {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -449,7 +449,7 @@ object ZkUtils extends Logging {
zk.deleteRecursive(dir)
zk.close()
} catch {
- case _ => // swallow
+ case _: Throwable => // swallow
}
}
@@ -466,7 +466,7 @@ object ZkUtils extends Logging {
} catch {
case e: ZkNoNodeException =>
(None, stat)
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
dataAndStat
}
@@ -484,7 +484,7 @@ object ZkUtils extends Logging {
client.getChildren(path)
} catch {
case e: ZkNoNodeException => return Nil
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -675,7 +675,7 @@ object ZkUtils extends Logging {
case nne: ZkNoNodeException =>
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2: Throwable => throw new AdministrationException(e2.toString)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 06be990..2436289 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -104,7 +104,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
fail("Topic should not exist")
} catch {
case e: AdministrationException => //this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -114,7 +114,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
fail("Add partitions should fail")
} catch {
case e: AdministrationException => //this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index dc0013f..881e69b 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -38,7 +38,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// test wrong replication factor
@@ -48,7 +48,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// correct assignment
@@ -84,7 +84,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// non-exist brokers
@@ -95,7 +95,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// inconsistent replication factor
@@ -106,7 +106,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// good assignment
@@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
fail("shouldn't be able to create a topic already exists")
} catch {
case e: TopicExistsException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index fcfc583..268d14e 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -83,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
fail("should get an exception")
} catch {
case e: ConsumerTimeoutException => // this is ok
- case e => throw e
+ case e: Throwable => throw e
}
}
@@ -406,10 +406,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
}
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
- import scala.collection.JavaConversions
+ import scala.collection.JavaConversions._
val children = zkClient.getChildren(path)
Collections.sort(children)
- val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
+ val childrenAsSeq : Seq[java.lang.String] = (children: mutable.Buffer[String]).toSeq
childrenAsSeq.map(partition =>
(partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 9f243f0..e8e454f 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -85,7 +85,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
messages ++= ms
import scala.collection.JavaConversions._
- javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _))))
+ javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
}
javaProducer.close
messages
@@ -103,7 +103,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def getMessages(nMessagesPerThread: Int,
jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
var messages: List[String] = Nil
- val topicMessageStreams = asMap(jTopicMessageStreams)
+ import scala.collection.JavaConversions._
+ val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index abee11b..726399e 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -29,8 +29,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
import scala.collection.JavaConversions._
- val messages = asIterable(messageSet)
- messages.map(m => m.message).iterator
+ messageSet.map(m => m.message).iterator
}
@Test
@@ -44,7 +43,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
import scala.collection.JavaConversions._
val m = createMessageSet(messages)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+ TestUtils.checkEquals(m.iterator, m.iterator)
}
@Test
@@ -52,7 +51,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
import scala.collection.JavaConversions._
val m = createMessageSet(messages, DefaultCompressionCodec)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+ TestUtils.checkEquals(m.iterator, m.iterator)
}
@Test