You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/09/10 00:21:17 UTC
[2/2] git commit: KAFKA-1046 Added support for Scala 2.10 builds
while maintaining compatibility with 2.8.x; reviewed by Neha and Jun
KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c12d2ea9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c12d2ea9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c12d2ea9
Branch: refs/heads/0.8
Commit: c12d2ea9e5b4bdcf9aeb07c89c69553a9f270c82
Parents: da45121
Author: Christopher Freeman <cf...@linkedin.com>
Authored: Mon Sep 9 15:20:47 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Sep 9 15:21:05 2013 -0700
----------------------------------------------------------------------
core/build.sbt | 1 +
core/src/main/scala/kafka/Kafka.scala | 2 +-
.../kafka/admin/AddPartitionsCommand.scala | 2 +-
.../src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../scala/kafka/admin/CreateTopicCommand.scala | 2 +-
.../scala/kafka/admin/DeleteTopicCommand.scala | 2 +-
.../scala/kafka/admin/ListTopicCommand.scala | 2 +-
.../PreferredReplicaLeaderElectionCommand.scala | 6 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 4 +-
.../main/scala/kafka/client/ClientUtils.scala | 2 +-
core/src/main/scala/kafka/cluster/Broker.scala | 2 +-
.../scala/kafka/consumer/ConsoleConsumer.scala | 6 +-
.../kafka/consumer/ConsumerFetcherManager.scala | 4 +-
.../scala/kafka/consumer/SimpleConsumer.scala | 2 +-
.../main/scala/kafka/consumer/TopicCount.scala | 2 +-
.../consumer/ZookeeperConsumerConnector.scala | 10 +--
.../consumer/ZookeeperTopicEventWatcher.scala | 2 +-
.../controller/ControllerChannelManager.scala | 4 +-
.../kafka/controller/KafkaController.scala | 16 ++--
.../controller/PartitionStateMachine.scala | 20 +++--
.../kafka/controller/ReplicaStateMachine.scala | 4 +-
.../main/scala/kafka/javaapi/FetchRequest.scala | 6 +-
.../main/scala/kafka/javaapi/Implicits.scala | 6 ++
.../scala/kafka/javaapi/OffsetRequest.scala | 5 +-
.../scala/kafka/javaapi/TopicMetadata.scala | 24 ++++--
.../kafka/javaapi/TopicMetadataRequest.scala | 8 +-
.../consumer/ZookeeperConsumerConnector.scala | 15 ++--
.../javaapi/message/ByteBufferMessageSet.scala | 4 +-
.../scala/kafka/javaapi/producer/Producer.scala | 3 +-
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
.../network/BoundedByteBufferReceive.scala | 2 +-
.../scala/kafka/producer/SyncProducer.scala | 2 +-
.../producer/async/DefaultEventHandler.scala | 4 +-
.../producer/async/ProducerSendThread.scala | 4 +-
.../kafka/server/AbstractFetcherThread.scala | 6 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 8 +-
.../kafka/server/KafkaServerStartable.scala | 4 +-
.../scala/kafka/server/ReplicaManager.scala | 2 +-
.../kafka/server/ZookeeperLeaderElector.scala | 2 +-
.../scala/kafka/tools/ImportZkOffsets.scala | 2 +-
core/src/main/scala/kafka/tools/JmxTool.scala | 2 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 4 +-
.../scala/kafka/tools/SimpleConsumerShell.scala | 2 +-
.../main/scala/kafka/utils/Annotations.scala | 36 --------
.../scala/kafka/utils/Annotations_2.8.scala | 36 ++++++++
.../scala/kafka/utils/Annotations_2.9+.scala | 38 +++++++++
core/src/main/scala/kafka/utils/Json.scala | 2 +-
.../src/main/scala/kafka/utils/Mx4jLoader.scala | 2 +-
core/src/main/scala/kafka/utils/Pool.scala | 12 ++-
core/src/main/scala/kafka/utils/Utils.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 24 +++---
.../unit/kafka/admin/AddPartitionsTest.scala | 4 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 12 +--
.../ZookeeperConsumerConnectorTest.scala | 6 +-
.../ZookeeperConsumerConnectorTest.scala | 5 +-
.../message/BaseMessageSetTestCases.scala | 7 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 88 ++++++++++----------
.../unit/kafka/metrics/KafkaTimerTest.scala | 5 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 5 +-
.../unit/kafka/producer/ProducerTest.scala | 14 ++--
.../unit/kafka/producer/SyncProducerTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../scala/kafka/perf/ConsumerPerformance.scala | 2 +-
project/Build.scala | 3 +-
64 files changed, 302 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index c54cf44..b5bcb44 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -23,6 +23,7 @@ libraryDependencies ++= Seq(
libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
deps :+ (sv match {
case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test"
+ case v if v.startsWith("2.10") => "org.scalatest" %% "scalatest" % "1.9.1" % "test"
case _ => "org.scalatest" %% "scalatest" % "1.8" % "test"
})
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index dafb1ee..988014a 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -47,7 +47,7 @@ object Kafka extends Logging {
kafkaServerStartble.awaitShutdown
}
catch {
- case e => fatal(e)
+ case e: Throwable => fatal(e)
}
System.exit(0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
index 5757c32..7f03708 100644
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@ -68,7 +68,7 @@ object AddPartitionsCommand extends Logging {
addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!")
} catch {
- case e =>
+ case e: Throwable =>
println("adding partitions failed because of " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index c399bc7..d6ab275 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -90,7 +90,7 @@ object AdminUtils extends Logging {
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2: Throwable => throw new AdministrationException(e2.toString)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index 21c1186..84c2095 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -74,7 +74,7 @@ object CreateTopicCommand extends Logging {
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!")
} catch {
- case e =>
+ case e: Throwable =>
println("creation failed because of " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
index 3da4518..804b331 100644
--- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
@@ -54,7 +54,7 @@ object DeleteTopicCommand {
println("deletion succeeded!")
}
catch {
- case e =>
+ case e: Throwable =>
println("delection failed because of " + e.getMessage)
println(Utils.stackTrace(e))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/ListTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
index c760cc0..eed49e1 100644
--- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala
@@ -72,7 +72,7 @@ object ListTopicCommand {
showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers)
}
catch {
- case e =>
+ case e: Throwable =>
println("list topic failed because of " + e.getMessage)
println(Utils.stackTrace(e))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index d5de5f3..34ed7aa 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
} catch {
- case e =>
+ case e: Throwable =>
println("Failed to start preferred replica election")
println(Utils.stackTrace(e))
} finally {
@@ -104,7 +104,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
throw new AdministrationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2: Throwable => throw new AdministrationException(e2.toString)
}
}
}
@@ -116,7 +116,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal
val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
} catch {
- case e => throw new AdminCommandFailedException("Admin command failed", e)
+ case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index aa61fa1..f333d29 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -119,7 +119,7 @@ object ReassignPartitionsCommand extends Logging {
"The replica assignment is \n" + partitionsToBeReassigned.toString())
}
} catch {
- case e =>
+ case e: Throwable =>
println("Partitions reassignment failed due to " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
@@ -142,7 +142,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
throw new AdminCommandFailedException("Partition reassignment currently in " +
"progress for %s. Aborting operation".format(partitionsBeingReassigned))
- case e => error("Admin command failed", e); false
+ case e: Throwable => error("Admin command failed", e); false
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index cc526ec..1d2f81b 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -54,7 +54,7 @@ object ClientUtils extends Logging{
fetchMetaDataSucceeded = true
}
catch {
- case e =>
+ case e: Throwable =>
warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
.format(correlationId, topics, shuffledBrokers(i).toString), e)
t = e
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index b03dea2..9407ed2 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -42,7 +42,7 @@ private[kafka] object Broker {
throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
}
} catch {
- case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
+ case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 719beb5..48fa7a3 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -204,7 +204,7 @@ object ConsoleConsumer extends Logging {
formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
numMessages += 1
} catch {
- case e =>
+ case e: Throwable =>
if (skipMessageOnError)
error("Error processing message, skipping this message: ", e)
else
@@ -220,7 +220,7 @@ object ConsoleConsumer extends Logging {
}
}
} catch {
- case e => error("Error processing message, stopping consumer: ", e)
+ case e: Throwable => error("Error processing message, stopping consumer: ", e)
}
System.err.println("Consumed %d messages".format(numMessages))
System.out.flush()
@@ -247,7 +247,7 @@ object ConsoleConsumer extends Logging {
zk.deleteRecursive(dir)
zk.close()
} catch {
- case _ => // swallow
+ case _: Throwable => // swallow
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index fa6b213..8c03308 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -79,7 +79,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
}
} catch {
- case t => {
+ case t: Throwable => {
if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else
@@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try {
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
} catch {
- case t => {
+ case t: Throwable => {
if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 4395fe3..fac64aa 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -84,7 +84,7 @@ class SimpleConsumer(val host: String,
disconnect()
throw ioe
}
- case e => throw e
+ case e: Throwable => throw e
}
response
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c8e8406..a3eb53e 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging {
case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
}
} catch {
- case e =>
+ case e: Throwable =>
error("error parsing consumer json string " + topicCountString, e)
throw e
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e7a692a..81bf0bd 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -175,7 +175,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient = null
}
} catch {
- case e =>
+ case e: Throwable =>
fatal("error during consumer connector shutdown", e)
}
info("ZKConsumerConnector shut down completed")
@@ -332,7 +332,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (doRebalance)
syncedRebalance
} catch {
- case t => error("error during syncedRebalance", t)
+ case t: Throwable => error("error during syncedRebalance", t)
}
}
info("stopping watcher executor thread for consumer " + consumerIdString)
@@ -384,7 +384,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
cluster = getCluster(zkClient)
done = rebalance(cluster)
} catch {
- case e =>
+ case e: Throwable =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered.
@@ -461,7 +461,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
" for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) {
- val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+ val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
@@ -581,7 +581,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition)
false
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index df83baa..a67c193 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
}
}
catch {
- case e =>
+ case e: Throwable =>
error("error in handling child changes", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ed1ce0b..beca460 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -93,7 +93,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
brokerStateInfo(brokerId).requestSendThread.shutdown()
brokerStateInfo.remove(brokerId)
}catch {
- case e => error("Error while removing broker by the controller", e)
+ case e: Throwable => error("Error while removing broker by the controller", e)
}
}
@@ -142,7 +142,7 @@ class RequestSendThread(val controllerId: Int,
}
}
} catch {
- case e =>
+ case e: Throwable =>
warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
// If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
channel.disconnect()
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ab18b7a..aef41ad 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -89,14 +89,14 @@ object KafkaController extends Logging {
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
- case t =>
+ case t: Throwable =>
// It may be due to an incompatible controller register version
warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
try {
return controllerInfoString.toInt
} catch {
- case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+ case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
}
}
}
@@ -436,7 +436,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
.format(topicAndPartition))
}
} catch {
- case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
+ case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
// remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(topicAndPartition)
}
@@ -448,7 +448,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
} catch {
- case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+ case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
removePartitionsFromPreferredReplicaElection(partitions)
}
@@ -514,9 +514,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
} catch {
case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
"Aborting controller startup procedure")
- case oe => error("Error while incrementing controller epoch", oe)
+ case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
- case oe => error("Error while incrementing controller epoch", oe)
+ case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
@@ -693,7 +693,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
- case e2 => throw new KafkaException(e2.toString)
+ case e2: Throwable => throw new KafkaException(e2.toString)
}
}
@@ -905,7 +905,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
}
}
}catch {
- case e => error("Error while handling partition reassignment", e)
+ case e: Throwable => error("Error while handling partition reassignment", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a084830..829163a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,7 +17,8 @@
package kafka.controller
import collection._
-import collection.JavaConversions._
+import collection.JavaConversions
+import collection.mutable.Buffer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
@@ -91,7 +92,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
} catch {
- case e => error("Error while moving some partitions to the online state", e)
+ case e: Throwable => error("Error while moving some partitions to the online state", e)
// TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
}
}
@@ -111,7 +112,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
- case e => error("Error while moving some partitions to %s state".format(targetState), e)
+ case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
}
}
@@ -321,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} catch {
case lenne: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe
- case sce =>
+ case sce: Throwable =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg, sce)
@@ -359,8 +360,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controllerContext.controllerLock synchronized {
if (hasStarted.get) {
try {
- debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
- val currentChildren = JavaConversions.asBuffer(children).toSet
+ val currentChildren = {
+ import JavaConversions._
+ debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
+ (children: Buffer[String]).toSet
+ }
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
// val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
@@ -375,7 +379,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch {
- case e => error("Error while handling new topic", e )
+ case e: Throwable => error("Error while handling new topic", e )
}
// TODO: kafka-330 Handle deleted topics
}
@@ -399,7 +403,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
} catch {
- case e => error("Error while handling add partitions for data path " + dataPath, e )
+ case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c964857..212c05d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
- case e => error("Error while moving some replicas to %s state".format(targetState), e)
+ case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
@@ -273,7 +273,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq)
} catch {
- case e => error("Error while handling broker changes", e)
+ case e: Throwable => error("Error while handling broker changes", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index b475240..6abdc17 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -17,10 +17,10 @@
package kafka.javaapi
-import scala.collection.JavaConversions
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionFetchInfo}
+import scala.collection.mutable
class FetchRequest(correlationId: Int,
clientId: String,
@@ -28,8 +28,10 @@ class FetchRequest(correlationId: Int,
minBytes: Int,
requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
+ import scala.collection.JavaConversions._
+
val underlying = {
- val scalaMap = JavaConversions.asMap(requestInfo).toMap
+ val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap
kafka.api.FetchRequest(
correlationId = correlationId,
clientId = clientId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index ee0a71d..0af3a67 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -40,4 +40,10 @@ private[javaapi] object Implicits extends Logging {
case None => null.asInstanceOf[T]
}
}
+
+ // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors
+ implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = {
+ import scala.collection.JavaConversions._
+ l: collection.mutable.Buffer[A]
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 1c77ff8..d88c7e4 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -19,7 +19,7 @@ package kafka.javaapi
import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionOffsetRequestInfo}
-import collection.JavaConversions
+import scala.collection.mutable
import java.nio.ByteBuffer
@@ -28,7 +28,8 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
clientId: String) {
val underlying = {
- val scalaMap = JavaConversions.asMap(requestInfo).toMap
+ import collection.JavaConversions._
+ val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap
kafka.api.OffsetRequest(
requestInfo = scalaMap,
versionId = versionId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index 97b6dcd..d08c3f4 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,16 +17,20 @@
package kafka.javaapi
import kafka.cluster.Broker
-import scala.collection.JavaConversions.asList
+import scala.collection.JavaConversions
private[javaapi] object MetadataListImplicits {
implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
- java.util.List[kafka.javaapi.TopicMetadata] =
- asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)))
+ java.util.List[kafka.javaapi.TopicMetadata] = {
+ import JavaConversions._
+ topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
+ }
implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
- java.util.List[kafka.javaapi.PartitionMetadata] =
- asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)))
+ java.util.List[kafka.javaapi.PartitionMetadata] = {
+ import JavaConversions._
+ partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
+ }
}
class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@@ -51,9 +55,15 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
underlying.leader
}
- def replicas: java.util.List[Broker] = asList(underlying.replicas)
+ def replicas: java.util.List[Broker] = {
+ import JavaConversions._
+ underlying.replicas
+ }
- def isr: java.util.List[Broker] = asList(underlying.isr)
+ def isr: java.util.List[Broker] = {
+ import JavaConversions._
+ underlying.isr
+ }
def errorCode: Short = underlying.errorCode
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 5f80df7..05757a1 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -18,7 +18,7 @@ package kafka.javaapi
import kafka.api._
import java.nio.ByteBuffer
-import scala.collection.JavaConversions
+import scala.collection.mutable
class TopicMetadataRequest(val versionId: Short,
override val correlationId: Int,
@@ -26,8 +26,10 @@ class TopicMetadataRequest(val versionId: Short,
val topics: java.util.List[String])
extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
- val underlying: kafka.api.TopicMetadataRequest =
- new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
+ val underlying: kafka.api.TopicMetadataRequest = {
+ import scala.collection.JavaConversions._
+ new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String])
+ }
def this(topics: java.util.List[String]) =
this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 14c4c8a..58e83f6 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -18,7 +18,8 @@ package kafka.javaapi.consumer
import kafka.serializer._
import kafka.consumer._
-import scala.collection.JavaConversions.asList
+import scala.collection.mutable
+import scala.collection.JavaConversions
/**
@@ -71,9 +72,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
keyDecoder: Decoder[K],
valueDecoder: Decoder[V])
: java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
- import scala.collection.JavaConversions._
- val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
+ val scalaTopicCountMap: Map[String, Int] = {
+ import JavaConversions._
+ Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
+ }
val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
for ((topic, streams) <- scalaReturn) {
@@ -88,8 +91,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
- def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
- asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder))
+ def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = {
+ import JavaConversions._
+ underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
+ }
def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 0a95248..fecee8d 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -20,12 +20,14 @@ import java.util.concurrent.atomic.AtomicLong
import scala.reflect.BeanProperty
import java.nio.ByteBuffer
import kafka.message._
+import kafka.javaapi.Implicits.javaListToScalaBuffer
class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet {
private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
- this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer)
+ // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly
+ this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer)
}
def this(messages: java.util.List[Message]) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 7265328..c465da5 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -19,6 +19,7 @@ package kafka.javaapi.producer
import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
+import scala.collection.mutable
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
@@ -38,7 +39,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
*/
def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._
- underlying.send(asBuffer(messages):_*)
+ underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4771d11..739e22a 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -318,7 +318,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
if(timeSinceLastFlush >= logFlushInterval)
log.flush
} catch {
- case e =>
+ case e: Throwable =>
error("Error flushing topic " + log.topicName, e)
e match {
case _: IOException =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
index cab1864..a442545 100644
--- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
@@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive
case e: OutOfMemoryError =>
error("OOME with size " + size, e)
throw e
- case e2 =>
+ case e2: Throwable =>
throw e2
}
buffer
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 306f200..419156e 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
// no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
disconnect()
throw e
- case e => throw e
+ case e: Throwable => throw e
}
response
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 2e36d3b..c151032 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -129,7 +129,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
else
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
} catch {
- case t =>
+ case t: Throwable =>
producerStats.serializationErrorRate.mark()
if (isSync) {
throw t
@@ -178,7 +178,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}catch { // Swallow recoverable exceptions and return None so that they can be retried.
case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
- case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
+ case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2b41a49..42e9c74 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String,
try {
processEvents
}catch {
- case e => error("Error in sending events: ", e)
+ case e: Throwable => error("Error in sending events: ", e)
}finally {
shutdownLatch.countDown
}
@@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String,
if(size > 0)
handler.handle(events)
}catch {
- case e => error("Error in handling batch of " + size + " events", e)
+ case e: Throwable => error("Error in handling batch of " + size + " events", e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index d5addb3..a5fc96d 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -95,7 +95,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
response = simpleConsumer.fetch(fetchRequest)
} catch {
- case t =>
+ case t: Throwable =>
if (isRunning.get) {
warn("Error in fetch %s".format(fetchRequest), t)
partitionMapLock synchronized {
@@ -136,7 +136,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
- case e =>
+ case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
.format(topic, partitionId, currentOffset.get), e)
}
@@ -147,7 +147,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
} catch {
- case e =>
+ case e: Throwable =>
warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
partitionsWithError += topicAndPartition
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cd02aab..4679e18 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
new ProduceResult(topicAndPartition, nle)
- case e =>
+ case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
error("Error processing ProducerRequest with correlation id %d from client %s on partition %s"
@@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- case t =>
+ case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d"
@@ -430,7 +430,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
- case e =>
+ case e: Throwable =>
warn("Error while responding to offset request", e)
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
}
@@ -481,7 +481,7 @@ class KafkaApis(val requestChannel: RequestChannel,
isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
- case e =>
+ case e: Throwable =>
error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 5be65e9..acda52b 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
server.startup()
}
catch {
- case e =>
+ case e: Throwable =>
fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
shutdown()
System.exit(1)
@@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
server.shutdown()
}
catch {
- case e =>
+ case e: Throwable =>
fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
System.exit(1)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f551243..03ba60e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -223,7 +223,7 @@ class ReplicaManager(val config: KafkaConfig,
makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
leaderAndISRRequest.correlationId)
} catch {
- case e =>
+ case e: Throwable =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " +
"epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.controllerEpoch, topicAndPartition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index f1f0625..33b7360 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -72,7 +72,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
}
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
- case e2 =>
+ case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
leaderId = -1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 55709b5..c8023ee 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -102,7 +102,7 @@ object ImportZkOffsets extends Logging {
try {
ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
} catch {
- case e => e.printStackTrace()
+ case e: Throwable => e.printStackTrace()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 7e424e7..747a675 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -86,7 +86,7 @@ object JmxTool extends Logging {
else
List(null)
- val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
+ val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
val allAttributes: Iterable[(ObjectName, Array[String])] =
names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 6fb545a..f0f871c 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -129,7 +129,7 @@ object MirrorMaker extends Logging {
try {
streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
} catch {
- case t =>
+ case t: Throwable =>
fatal("Unable to create stream - shutting down mirror maker.")
connectors.foreach(_.shutdown)
}
@@ -204,7 +204,7 @@ object MirrorMaker extends Logging {
}
}
} catch {
- case e =>
+ case e: Throwable =>
fatal("Stream unexpectedly exited.", e)
} finally {
shutdownLatch.countDown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 3cfa384..c889835 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging {
formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
numMessagesConsumed += 1
} catch {
- case e =>
+ case e: Throwable =>
if (skipMessageOnError)
error("Error processing message, skipping this message: ", e)
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala
deleted file mode 100644
index 28269eb..0000000
--- a/core/src/main/scala/kafka/utils/Annotations.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-/* Some helpful annotations */
-
-/**
- * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
- * must respect
- */
-class threadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is not threadsafe
- */
-class nonthreadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is immutable
- */
-class immutable extends StaticAnnotation
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations_2.8.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations_2.8.scala b/core/src/main/scala/kafka/utils/Annotations_2.8.scala
new file mode 100644
index 0000000..28269eb
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Annotations_2.8.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
new file mode 100644
index 0000000..ab95ce1
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import scala.annotation.StaticAnnotation
+
+/* Some helpful annotations */
+
+/**
+ * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
+ * must respect
+ */
+class threadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is not threadsafe
+ */
+class nonthreadsafe extends StaticAnnotation
+
+/**
+ * Indicates that the annotated class is immutable
+ */
+class immutable extends StaticAnnotation
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index f80b2cc..03fb06f 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -32,7 +32,7 @@ object Json extends Logging {
try {
JSON.parseFull(input)
} catch {
- case t =>
+ case t: Throwable =>
throw new KafkaException("Can't parse json string: %s".format(input), t)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index 64d84cc..db9f20b 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -64,7 +64,7 @@ object Mx4jLoader extends Logging {
case e: ClassNotFoundException => {
info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
}
- case e => {
+ case e: Throwable => {
warn("Could not start register mbean in JMX", e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 9a86eab..9ddcde7 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -19,6 +19,7 @@ package kafka.utils
import java.util.ArrayList
import java.util.concurrent._
+import collection.mutable
import collection.JavaConversions
import kafka.common.KafkaException
import java.lang.Object
@@ -71,10 +72,15 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
def remove(key: K): V = pool.remove(key)
- def keys = JavaConversions.asSet(pool.keySet())
+ def keys: mutable.Set[K] = {
+ import JavaConversions._
+ pool.keySet()
+ }
- def values: Iterable[V] =
- JavaConversions.asIterable(new ArrayList[V](pool.values()))
+ def values: Iterable[V] = {
+ import JavaConversions._
+ new ArrayList[V](pool.values())
+ }
def clear() { pool.clear() }
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index e83eb5f..e0a5a27 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -67,7 +67,7 @@ object Utils extends Logging {
fun()
}
catch {
- case t =>
+ case t: Throwable =>
// log any error and the stack trace
error("error in loggedRunnable", t)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ca1ce12..6eede1b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -271,7 +271,7 @@ object ZkUtils extends Logging {
storedData = readData(client, path)._1
} catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -281,7 +281,7 @@ object ZkUtils extends Logging {
info(path + " exists with value " + data + " during connection loss; this is ok")
}
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -321,7 +321,7 @@ object ZkUtils extends Logging {
case None => // the node disappeared; retry creating the ephemeral node immediately
}
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
}
@@ -360,10 +360,10 @@ object ZkUtils extends Logging {
} catch {
case e: ZkNodeExistsException =>
client.writeData(path, data)
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -416,7 +416,7 @@ object ZkUtils extends Logging {
createParentPath(client, path)
client.createEphemeral(path, data)
}
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -428,7 +428,7 @@ object ZkUtils extends Logging {
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
false
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -439,7 +439,7 @@ object ZkUtils extends Logging {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -449,7 +449,7 @@ object ZkUtils extends Logging {
zk.deleteRecursive(dir)
zk.close()
} catch {
- case _ => // swallow
+ case _: Throwable => // swallow
}
}
@@ -466,7 +466,7 @@ object ZkUtils extends Logging {
} catch {
case e: ZkNoNodeException =>
(None, stat)
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
dataAndStat
}
@@ -484,7 +484,7 @@ object ZkUtils extends Logging {
client.getChildren(path)
} catch {
case e: ZkNoNodeException => return Nil
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -675,7 +675,7 @@ object ZkUtils extends Logging {
case nne: ZkNoNodeException =>
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
- case e2 => throw new AdministrationException(e2.toString)
+ case e2: Throwable => throw new AdministrationException(e2.toString)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 06be990..2436289 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -104,7 +104,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
fail("Topic should not exist")
} catch {
case e: AdministrationException => //this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
@@ -114,7 +114,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
fail("Add partitions should fail")
} catch {
case e: AdministrationException => //this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index dc0013f..881e69b 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -38,7 +38,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// test wrong replication factor
@@ -48,7 +48,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// correct assignment
@@ -84,7 +84,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// non-exist brokers
@@ -95,7 +95,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// inconsistent replication factor
@@ -106,7 +106,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
catch {
case e: AdministrationException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
// good assignment
@@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
fail("shouldn't be able to create a topic already exists")
} catch {
case e: TopicExistsException => // this is good
- case e2 => throw e2
+ case e2: Throwable => throw e2
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index fcfc583..268d14e 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -83,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
fail("should get an exception")
} catch {
case e: ConsumerTimeoutException => // this is ok
- case e => throw e
+ case e: Throwable => throw e
}
}
@@ -406,10 +406,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
}
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
- import scala.collection.JavaConversions
+ import scala.collection.JavaConversions._
val children = zkClient.getChildren(path)
Collections.sort(children)
- val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
+ val childrenAsSeq : Seq[java.lang.String] = (children: mutable.Buffer[String]).toSeq
childrenAsSeq.map(partition =>
(partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 9f243f0..e8e454f 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -85,7 +85,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
messages ++= ms
import scala.collection.JavaConversions._
- javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _))))
+ javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
}
javaProducer.close
messages
@@ -103,7 +103,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def getMessages(nMessagesPerThread: Int,
jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
var messages: List[String] = Nil
- val topicMessageStreams = asMap(jTopicMessageStreams)
+ import scala.collection.JavaConversions._
+ val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
http://git-wip-us.apache.org/repos/asf/kafka/blob/c12d2ea9/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index abee11b..726399e 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -29,8 +29,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
import scala.collection.JavaConversions._
- val messages = asIterable(messageSet)
- messages.map(m => m.message).iterator
+ messageSet.map(m => m.message).iterator
}
@Test
@@ -44,7 +43,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
import scala.collection.JavaConversions._
val m = createMessageSet(messages)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+ TestUtils.checkEquals(m.iterator, m.iterator)
}
@Test
@@ -52,7 +51,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
import scala.collection.JavaConversions._
val m = createMessageSet(messages, DefaultCompressionCodec)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+ TestUtils.checkEquals(m.iterator, m.iterator)
}
@Test