You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/01/24 21:54:13 UTC
svn commit: r1235492 [1/2] - in /incubator/kafka/branches/0.8:
core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/
core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/
core/src/main/scala/kafka/consumer/ core/src/main/scala/...
Author: nehanarkhede
Date: Tue Jan 24 20:54:11 2012
New Revision: 1235492
URL: http://svn.apache.org/viewvc?rev=1235492&view=rev
Log:
KAFKA-238 getTopicMetadata API; patched by nehanarkhede; reviewed by junrao
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PartitionMetaData.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala
incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
incubator/kafka/branches/0.8/project/build.properties
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Jan 24 20:54:11 2012
@@ -20,9 +20,12 @@ package kafka.admin
import java.util.Random
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import kafka.utils.{SystemTime, Utils, ZkUtils}
+import kafka.api.{TopicMetadata, PartitionMetadata}
+import kafka.utils.{Logging, SystemTime, Utils, ZkUtils}
+import kafka.cluster.Broker
+import collection.mutable.HashMap
-object AdminUtils {
+object AdminUtils extends Logging {
val rand = new Random
/**
@@ -43,7 +46,7 @@ object AdminUtils {
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*/
- def assginReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
+ def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
fixedStartIndex: Int = -1) // for testing only
: Array[List[String]] = {
if (nPartitions <= 0)
@@ -75,8 +78,9 @@ object AdminUtils {
val topicVersion = SystemTime.milliseconds
ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
for (i <- 0 until replicaAssignmentList.size) {
- val zkPath = ZkUtils.getTopicPartReplicasPath(topic, i.toString)
+ val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
+ debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
}
}
catch {
@@ -88,24 +92,47 @@ object AdminUtils {
}
}
- def getTopicMetaDataFromZK(topic: String, zkClient: ZkClient): Option[Seq[PartitionMetaData]] = {
- if (!ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
- return None
-
- val topicPartitionsPath = ZkUtils.getTopicPartsPath(topic)
- val partitions = ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).sortWith((s,t) => s.toInt < t.toInt)
- val ret = new Array[PartitionMetaData](partitions.size)
- for (i <-0 until ret.size) {
- val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartReplicasPath(topic, partitions(i)))
- val inSync = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartInSyncPath(topic, partitions(i)))
- val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartLeaderPath(topic, partitions(i)))
- ret(i) = new PartitionMetaData(partitions(i),
- Utils.getCSVList(replicas).toList,
- Utils.getCSVList(inSync).toList,
- if (leader == null) None else Some(leader)
- )
+ def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
+ val cachedBrokerInfo = new HashMap[Int, Broker]()
+
+ val metadataList = topics.map { topic =>
+ if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
+ val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic)
+ val partitionMetadata = new Array[PartitionMetadata](partitions.size)
+
+ for (i <-0 until partitionMetadata.size) {
+ val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
+ val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString))
+ val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitions(i).toString))
+ debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
+
+ partitionMetadata(i) = new PartitionMetadata(partitions(i),
+ if (leader == null) None else Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(leader.toInt)).head),
+ getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
+ getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)),
+ None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+ }
+ Some(new TopicMetadata(topic, partitionMetadata))
+ } else
+ None
+ }
+
+ metadataList.toList
+ }
+
+ private def getBrokerInfoFromCache(zkClient: ZkClient,
+ cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
+ brokerIds: Seq[Int]): Seq[Broker] = {
+ brokerIds.map { id =>
+ val optionalBrokerInfo = cachedBrokerInfo.get(id)
+ optionalBrokerInfo match {
+ case Some(brokerInfo) => brokerInfo // return broker info from the cache
+ case None => // fetch it from zookeeper
+ val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
+ cachedBrokerInfo += (id -> brokerInfo)
+ brokerInfo
+ }
}
- Some(ret)
}
private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Tue Jan 24 20:54:11 2012
@@ -44,9 +44,10 @@ object CreateTopicCommand {
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
- val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manuallly assigning replicas to brokers")
+ val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers")
.withRequiredArg
- .describedAs("broker_id_for_part1_replica1:broker_id_for_part1_replica2,broker_id_for_part2_replica1:broker_id_for_part2_replica2, ...")
+ .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
+ "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
.ofType(classOf[String])
.defaultsTo("")
@@ -68,14 +69,7 @@ object CreateTopicCommand {
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
- val brokerList = ZkUtils.getSortedBrokerList(zkClient)
- var replicaAssignment: Seq[List[String]] = null
-
- if (replicaAssignmentStr == "")
- replicaAssignment = AdminUtils.assginReplicasToBrokers(brokerList, nPartitions, replicationFactor)
- else
- replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
- AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+ createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!")
}
catch {
@@ -89,11 +83,22 @@ object CreateTopicCommand {
}
}
+ def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+ val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+ var replicaAssignment: Seq[List[String]] = null
+
+ if (replicaAssignmentStr == "")
+ replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
+ else
+ replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+ AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+ }
+
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new Array[List[String]](partitionList.size)
for (i <- 0 until partitionList.size) {
- val brokerList = partitionList(i).split(":")
+ val brokerList = partitionList(i).split(":").map(s => s.trim())
if (brokerList.size <= 0)
throw new AdministrationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala Tue Jan 24 20:54:11 2012
@@ -76,13 +76,13 @@ object ListTopicCommand {
}
def showTopic(topic: String, zkClient: ZkClient) {
- val topicMetaData = AdminUtils.getTopicMetaDataFromZK(topic, zkClient)
+ val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
topicMetaData match {
case None =>
println("topic " + topic + " doesn't exist!")
case Some(tmd) =>
println("topic: " + topic)
- for (part <- tmd)
+ for (part <- tmd.partitionsMetadata)
println(part.toString)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Tue Jan 24 20:54:11 2012
@@ -38,7 +38,7 @@ class FetchRequest(val topic: String,
val maxSize: Int) extends Request(RequestKeys.Fetch) {
def writeTo(buffer: ByteBuffer) {
- Utils.writeShortString(buffer, topic, "UTF-8")
+ Utils.writeShortString(buffer, topic)
buffer.putInt(partition)
buffer.putLong(offset)
buffer.putInt(maxSize)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiFetchRequest.scala Tue Jan 24 20:54:11 2012
@@ -19,8 +19,6 @@ package kafka.api
import java.nio._
import kafka.network._
-import kafka.utils._
-import kafka.api._
object MultiFetchRequest {
def readFrom(buffer: ByteBuffer): MultiFetchRequest = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Tue Jan 24 20:54:11 2012
@@ -62,7 +62,7 @@ class OffsetRequest(val topic: String,
val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
def writeTo(buffer: ByteBuffer) {
- Utils.writeShortString(buffer, topic, "UTF-8")
+ Utils.writeShortString(buffer, topic)
buffer.putInt(partition)
buffer.putLong(time)
buffer.putInt(maxNumOffsets)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Tue Jan 24 20:54:11 2012
@@ -41,7 +41,7 @@ class ProducerRequest(val topic: String,
val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
def writeTo(buffer: ByteBuffer) {
- Utils.writeShortString(buffer, topic, "UTF-8")
+ Utils.writeShortString(buffer, topic)
buffer.putInt(partition)
buffer.putInt(messages.serialized.limit)
buffer.put(messages.serialized)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Tue Jan 24 20:54:11 2012
@@ -23,4 +23,5 @@ object RequestKeys {
val MultiFetch: Short = 2
val MultiProduce: Short = 3
val Offsets: Short = 4
+ val TopicMetadata: Short = 5
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1235492&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Tue Jan 24 20:54:11 2012
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import kafka.cluster.Broker
+import java.nio.ByteBuffer
+import kafka.utils.Utils._
+import collection.mutable.ListBuffer
+
+/**
+ * topic (2 bytes + topic.length)
+ * number of partitions (4 bytes)
+ *
+ * partition id (4 bytes)
+ *
+ * does leader exist (1 byte)
+ * leader info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ * number of replicas (2 bytes)
+ * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ * number of in sync replicas (2 bytes)
+ * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ *
+ * does log metadata exist (1 byte)
+ * number of log segments (4 bytes)
+ * total size of log in bytes (8 bytes)
+ *
+ * number of log segments (4 bytes)
+ * beginning offset (8 bytes)
+ * last modified timestamp (8 bytes)
+ * size of log segment (8 bytes)
+ *
+ */
+
+sealed trait LeaderRequest { def requestId: Byte }
+case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 }
+case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
+
+sealed trait LogSegmentMetadataRequest { def requestId: Byte }
+case object LogSegmentMetadataExists extends LogSegmentMetadataRequest { val requestId: Byte = 1 }
+case object LogSegmentMetadataDoesNotExist extends LogSegmentMetadataRequest { val requestId: Byte = 0 }
+
+object TopicMetadata {
+
+ def readFrom(buffer: ByteBuffer): TopicMetadata = {
+ val topic = readShortString(buffer)
+ val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
+ val partitionsMetadata = new ListBuffer[PartitionMetadata]()
+ for(i <- 0 until numPartitions)
+ partitionsMetadata += PartitionMetadata.readFrom(buffer)
+ new TopicMetadata(topic, partitionsMetadata)
+ }
+}
+
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata]) {
+ def sizeInBytes: Int = {
+ var size: Int = shortStringLength(topic)
+ size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes)
+ debug("Size of topic metadata = " + size)
+ size
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ /* topic */
+ writeShortString(buffer, topic)
+ /* number of partitions */
+ buffer.putInt(partitionsMetadata.size)
+ partitionsMetadata.foreach(m => m.writeTo(buffer))
+ }
+}
+
+object PartitionMetadata {
+
+ def readFrom(buffer: ByteBuffer): PartitionMetadata = {
+ val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
+ val doesLeaderExist = getLeaderRequest(buffer.get)
+ val leader = doesLeaderExist match {
+ case LeaderExists => /* leader exists */
+ Some(Broker.readFrom(buffer))
+ case LeaderDoesNotExist => None
+ }
+
+ /* list of all replicas */
+ val numReplicas = getShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
+ val replicas = new Array[Broker](numReplicas)
+ for(i <- 0 until numReplicas) {
+ replicas(i) = Broker.readFrom(buffer)
+ }
+
+ /* list of in-sync replicas */
+ val numIsr = getShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
+ val isr = new Array[Broker](numIsr)
+ for(i <- 0 until numIsr) {
+ isr(i) = Broker.readFrom(buffer)
+ }
+
+ val doesLogMetadataExist = getLogSegmentMetadataRequest(buffer.get)
+ val logMetadata = doesLogMetadataExist match {
+ case LogSegmentMetadataExists =>
+ val numLogSegments = getIntInRange(buffer, "total number of log segments", (0, Int.MaxValue))
+ val totalDataSize = getLongInRange(buffer, "total data size", (0, Long.MaxValue))
+ val numSegmentMetadata = getIntInRange(buffer, "number of log segment metadata", (0, Int.MaxValue))
+ val segmentMetadata = numSegmentMetadata match {
+ case 0 => None
+ case _ =>
+ val metadata = new ListBuffer[LogSegmentMetadata]()
+ for(i <- 0 until numSegmentMetadata) {
+ val beginningOffset = getLongInRange(buffer, "beginning offset", (0, Long.MaxValue))
+ val lastModified = getLongInRange(buffer, "last modified time", (0, Long.MaxValue))
+ val size = getLongInRange(buffer, "size of log segment", (0, Long.MaxValue))
+ metadata += new LogSegmentMetadata(beginningOffset, lastModified, size)
+ }
+ Some(metadata)
+ }
+ Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata))
+ case LogSegmentMetadataDoesNotExist => None
+ }
+ new PartitionMetadata(partitionId, leader, replicas, isr, logMetadata)
+ }
+
+ def getLeaderRequest(requestId: Byte): LeaderRequest = {
+ requestId match {
+ case LeaderExists.requestId => LeaderExists
+ case LeaderDoesNotExist.requestId => LeaderDoesNotExist
+ case _ => throw new IllegalArgumentException("Unknown leader request id " + requestId)
+ }
+ }
+
+ def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = {
+ requestId match {
+ case LogSegmentMetadataExists.requestId => LogSegmentMetadataExists
+ case LogSegmentMetadataDoesNotExist.requestId => LogSegmentMetadataDoesNotExist
+ }
+ }
+}
+
+case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker],
+ logMetadata: Option[LogMetadata]) {
+ def sizeInBytes: Int = {
+ var size: Int = 4 /* partition id */ + 1 /* if leader exists*/
+
+ leader match {
+ case Some(l) => size += l.sizeInBytes
+ case None =>
+ }
+
+ size += 2 /* number of replicas */
+ size += replicas.foldLeft(0)(_ + _.sizeInBytes)
+ size += 2 /* number of in sync replicas */
+ size += isr.foldLeft(0)(_ + _.sizeInBytes)
+
+ size += 1 /* if log segment metadata exists */
+ logMetadata match {
+ case Some(metadata) => size += metadata.sizeInBytes
+ case None =>
+ }
+ debug("Size of partition metadata = " + size)
+ size
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(partitionId)
+
+ /* if leader exists*/
+ leader match {
+ case Some(l) =>
+ buffer.put(LeaderExists.requestId)
+ /* leader id host_name port */
+ l.writeTo(buffer)
+ case None => buffer.put(LeaderDoesNotExist.requestId)
+ }
+
+ /* number of replicas */
+ buffer.putShort(replicas.size.toShort)
+ replicas.foreach(r => r.writeTo(buffer))
+
+ /* number of in-sync replicas */
+ buffer.putShort(isr.size.toShort)
+ isr.foreach(r => r.writeTo(buffer))
+
+ /* if log segment metadata exists */
+ logMetadata match {
+ case Some(metadata) =>
+ buffer.put(LogSegmentMetadataExists.requestId)
+ metadata.writeTo(buffer)
+ case None => buffer.put(LogSegmentMetadataDoesNotExist.requestId)
+ }
+
+ }
+}
+
+case class LogMetadata(numLogSegments: Int, totalSize: Long, logSegmentMetadata: Option[Seq[LogSegmentMetadata]]) {
+ def sizeInBytes: Int = {
+ var size: Int = 4 /* num log segments */ + 8 /* total data size */ + 4 /* number of log segment metadata */
+ logSegmentMetadata match {
+ case Some(segmentMetadata) => size += segmentMetadata.foldLeft(0)(_ + _.sizeInBytes)
+ case None =>
+ }
+ debug("Size of log metadata = " + size)
+ size
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(numLogSegments)
+ buffer.putLong(totalSize)
+ /* if segment metadata exists */
+ logSegmentMetadata match {
+ case Some(segmentMetadata) =>
+ /* number of log segments */
+ buffer.putInt(segmentMetadata.size)
+ segmentMetadata.foreach(m => m.writeTo(buffer))
+ case None =>
+ buffer.putInt(0)
+ }
+ }
+}
+
+case class LogSegmentMetadata(beginningOffset: Long, lastModified: Long, size: Long) {
+ def sizeInBytes: Int = {
+ 8 /* beginning offset */ + 8 /* last modified timestamp */ + 8 /* log segment size in bytes */
+ }
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putLong(beginningOffset)
+ buffer.putLong(lastModified)
+ buffer.putLong(size)
+ }
+}
+
+
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1235492&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Tue Jan 24 20:54:11 2012
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.utils.Utils._
+import kafka.network.{Send, Request}
+import java.nio.channels.GatheringByteChannel
+import kafka.common.ErrorMapping
+import collection.mutable.ListBuffer
+
+sealed trait DetailedMetadataRequest { def requestId: Short }
+case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
+case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
+
+object TopicMetadataRequest {
+
+ /**
+ * TopicMetadataRequest has the following format -
+ *
+ * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes)
+ *
+ * The detailedMetadata field is a placeholder for requesting various details about partition and log metadata
+ * By default, the value for this field is 0, which means it will just return leader, replica and ISR metadata for
+ * all partitions of the list of topics mentioned in the request.
+ */
+ def getDetailedMetadataRequest(requestId: Short): DetailedMetadataRequest = {
+ requestId match {
+ case SegmentMetadata.requestId => SegmentMetadata
+ case NoSegmentMetadata.requestId => NoSegmentMetadata
+ case _ => throw new IllegalArgumentException("Unknown detailed metadata request id " + requestId)
+ }
+ }
+
+ def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
+ val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+ val topics = new ListBuffer[String]()
+ for(i <- 0 until numTopics)
+ topics += readShortString(buffer, "UTF-8")
+ val topicsList = topics.toList
+ val returnDetailedMetadata = getDetailedMetadataRequest(buffer.getShort)
+ var timestamp: Option[Long] = None
+ var count: Option[Int] = None
+ returnDetailedMetadata match {
+ case NoSegmentMetadata =>
+ case SegmentMetadata =>
+ timestamp = Some(buffer.getLong)
+ count = Some(buffer.getInt)
+ case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request "
+ + returnDetailedMetadata.requestId)
+ }
+ debug("topic = %s, detailed metadata request = %d"
+ .format(topicsList.head, returnDetailedMetadata.requestId))
+ new TopicMetadataRequest(topics.toList, returnDetailedMetadata, timestamp, count)
+ }
+
+ def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = {
+ val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes)
+ val buffer = ByteBuffer.allocate(size)
+ /* number of topics */
+ buffer.putInt(topicMetadata.size)
+ /* topic partition_metadata */
+ topicMetadata.foreach(m => m.writeTo(buffer))
+ buffer.rewind()
+ buffer
+ }
+
+ def deserializeTopicsMetadataResponse(buffer: ByteBuffer): Seq[TopicMetadata] = {
+ /* number of topics */
+ val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+ val topicMetadata = new Array[TopicMetadata](numTopics)
+ for(i <- 0 until numTopics)
+ topicMetadata(i) = TopicMetadata.readFrom(buffer)
+ topicMetadata
+ }
+}
+
+case class TopicMetadataRequest(val topics: Seq[String],
+ val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
+ val timestamp: Option[Long] = None, val count: Option[Int] = None)
+ extends Request(RequestKeys.TopicMetadata){
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(topics.size)
+ topics.foreach(topic => writeShortString(buffer, topic))
+ buffer.putShort(detailedMetadata.requestId)
+ detailedMetadata match {
+ case SegmentMetadata =>
+ buffer.putLong(timestamp.get)
+ buffer.putInt(count.get)
+ case NoSegmentMetadata =>
+ case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+ }
+ }
+
+ def sizeInBytes(): Int = {
+ var size: Int = 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
+ 2 /* detailed metadata */
+ detailedMetadata match {
+ case SegmentMetadata =>
+ size += 8 /* timestamp */ + 4 /* count */
+ case NoSegmentMetadata =>
+ case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+ }
+ size
+ }
+}
+
+class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send {
+ private var size: Int = topicsMetadata.foldLeft(0)(_ + _.sizeInBytes)
+ private val header = ByteBuffer.allocate(6)
+ val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
+ header.putInt(size + 2)
+ header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
+ header.rewind()
+
+ var complete: Boolean = false
+
+ def writeTo(channel: GatheringByteChannel): Int = {
+ expectIncomplete()
+ var written = 0
+ if(header.hasRemaining)
+ written += channel.write(header)
+ if(!header.hasRemaining && metadata.hasRemaining)
+ written += channel.write(metadata)
+
+ if(!metadata.hasRemaining)
+ complete = true
+ written
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Tue Jan 24 20:54:11 2012
@@ -17,28 +17,49 @@
package kafka.cluster
-import java.util.Arrays
-import kafka.utils._
-import java.net.InetAddress
-import kafka.server.KafkaConfig
-import util.parsing.json.JSON
+import kafka.utils.Utils._
+import java.nio.ByteBuffer
/**
* A Kafka broker
*/
private[kafka] object Broker {
+
def createBroker(id: Int, brokerInfoString: String): Broker = {
+ if(brokerInfoString == null)
+ throw new IllegalArgumentException("Broker id %s does not exist".format(id))
val brokerInfo = brokerInfoString.split(":")
new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
}
+
+ def readFrom(buffer: ByteBuffer): Broker = {
+ val id = buffer.getInt
+ val creatorId = readShortString(buffer)
+ val host = readShortString(buffer)
+ val port = buffer.getInt
+ new Broker(id, creatorId, host, port)
+ }
}
-private[kafka] class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
+private[kafka] case class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
-
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(id)
+ writeShortString(buffer, creatorId)
+ writeShortString(buffer, host)
+ buffer.putInt(port)
+ }
+
+ def sizeInBytes: Int = {
+ val size = shortStringLength(creatorId) + shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
+ debug("Size of broker info = " + size)
+ size
+ }
+
override def equals(obj: Any): Boolean = {
obj match {
case null => false
@@ -47,6 +68,6 @@ private[kafka] class Broker(val id: Int,
}
}
- override def hashCode(): Int = Utils.hashcode(id, host, port)
+ override def hashCode(): Int = hashcode(id, host, port)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Cluster.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
package kafka.cluster
-import kafka.utils._
import scala.collection._
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
package kafka.common
-import kafka.consumer._
import kafka.message.InvalidMessageException
import java.nio.ByteBuffer
import java.lang.Throwable
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala?rev=1235492&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaZookeperClient.scala Tue Jan 24 20:54:11 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.common
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKStringSerializer, ZKConfig}
+import java.util.concurrent.atomic.AtomicReference
+
+object KafkaZookeeperClient {
+ private val INSTANCE = new AtomicReference[ZkClient](null)
+
+ def getZookeeperClient(config: ZKConfig): ZkClient = {
+ // TODO: This cannot be a singleton since unit tests break if we do that
+// INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+// ZKStringSerializer))
+ INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ ZKStringSerializer))
+ INSTANCE.get()
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Jan 24 20:54:11 2012
@@ -21,13 +21,11 @@ import scala.collection.mutable._
import scala.collection.JavaConversions._
import org.I0Itec.zkclient._
import joptsimple._
-import java.util.Arrays.asList
import java.util.Properties
import java.util.Random
import java.io.PrintStream
import kafka.message._
import kafka.utils.{Utils, Logging}
-import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Tue Jan 24 20:54:11 2012
@@ -18,7 +18,7 @@
package kafka.consumer
import scala.collection.JavaConversions._
-import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.server.KafkaServerStartable
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala Tue Jan 24 20:54:11 2012
@@ -54,8 +54,7 @@ class OracleOffsetStorage(val connection
} finally {
commitOrRollback(connection, success)
}
- if(logger.isDebugEnabled)
- logger.debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
+ debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
}
def close() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Tue Jan 24 20:54:11 2012
@@ -16,10 +16,8 @@
*/
package kafka.javaapi
-import java.nio.ByteBuffer
import kafka.serializer.Encoder
-import kafka.producer.{ProducerConfig, ProducerPool}
-import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+import kafka.producer.async.QueueItem
import kafka.utils.Logging
private[javaapi] object Implicits extends Logging {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Tue Jan 24 20:54:11 2012
@@ -17,8 +17,7 @@
package kafka.javaapi.message
-import java.nio.channels._
-import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
+import kafka.message.{MessageAndOffset, InvalidMessageException}
/**
* A set of messages. A message set has a fixed serialized form, though the container
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/Producer.scala Tue Jan 24 20:54:11 2012
@@ -54,7 +54,6 @@ class Producer[K,V](config: ProducerConf
* partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
* object in the send API
*/
- import kafka.javaapi.Implicits._
def this(config: ProducerConfig,
encoder: Encoder[V],
eventHandler: kafka.javaapi.producer.async.EventHandler[V],
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Jan 24 20:54:11 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLat
import kafka.server.{KafkaConfig, KafkaZooKeeper}
import kafka.common.{InvalidTopicException, InvalidPartitionException}
import kafka.api.OffsetRequest
+import org.I0Itec.zkclient.ZkClient
/**
* The guy who creates and hands out logs
@@ -38,15 +39,16 @@ private[kafka] class LogManager(val conf
needRecovery: Boolean) extends Logging {
val logDir: File = new File(config.logDir)
+ var kafkaZookeeper = new KafkaZooKeeper(config, this)
+
private val numPartitions = config.numPartitions
private val maxSize: Long = config.logFileSize
private val flushInterval = config.flushInterval
private val topicPartitionsMap = config.topicPartitionsMap
private val logCreationLock = new Object
private val random = new java.util.Random
- private var kafkaZookeeper: KafkaZooKeeper = null
private var zkActor: Actor = null
- private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
+ private val startupLatch: CountDownLatch = new CountDownLatch(1)
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervalMap = config.flushIntervalMap
private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
@@ -82,29 +84,26 @@ private[kafka] class LogManager(val conf
scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
}
- if(config.enableZookeeper) {
- kafkaZookeeper = new KafkaZooKeeper(config, this)
- kafkaZookeeper.startup
- zkActor = new Actor {
- def act() {
- loop {
- receive {
- case topic: String =>
- try {
- kafkaZookeeper.registerTopicInZk(topic)
- }
- catch {
- case e => error(e) // log it and let it go
- }
- case StopActor =>
- info("zkActor stopped")
- exit
- }
+ kafkaZookeeper.startup
+ zkActor = new Actor {
+ def act() {
+ loop {
+ receive {
+ case topic: String =>
+ try {
+ kafkaZookeeper.registerTopicInZk(topic)
+ }
+ catch {
+ case e => error(e) // log it and let it go
+ }
+ case StopActor =>
+ info("zkActor stopped")
+ exit
}
}
}
- zkActor.start
}
+ zkActor.start
case object StopActor
@@ -119,24 +118,20 @@ private[kafka] class LogManager(val conf
* Register this broker in ZK for the first time.
*/
def startup() {
- if(config.enableZookeeper) {
- kafkaZookeeper.registerBrokerInZk()
- for (topic <- getAllTopics)
- kafkaZookeeper.registerTopicInZk(topic)
- startupLatch.countDown
- }
+ kafkaZookeeper.registerBrokerInZk()
+ for (topic <- getAllTopics)
+ kafkaZookeeper.registerTopicInZk(topic)
+ startupLatch.countDown
info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
}
private def awaitStartup() {
- if (config.enableZookeeper)
- startupLatch.await
+ startupLatch.await
}
private def registerNewTopicInZK(topic: String) {
- if (config.enableZookeeper)
- zkActor ! topic
+ zkActor ! topic
}
/**
@@ -288,10 +283,8 @@ private[kafka] class LogManager(val conf
val iter = getLogIterator
while(iter.hasNext)
iter.next.close()
- if (config.enableZookeeper) {
- zkActor ! StopActor
- kafkaZookeeper.close
- }
+ zkActor ! StopActor
+ kafkaZookeeper.close
}
private def getLogIterator(): Iterator[Log] = {
@@ -345,4 +338,7 @@ private[kafka] class LogManager(val conf
def getAllTopics(): Iterator[String] = logs.keys.iterator
def getTopicPartitionsMap() = topicPartitionsMap
+
+ def getServerConfig: KafkaConfig = config
+ def getZookeeperClient: ZkClient = kafkaZookeeper.zkClient
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,6 @@ package kafka.message
import java.io.InputStream
import java.nio.ByteBuffer
-import scala.Math
class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
override def read():Int = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
package kafka.message
-import scala.collection.mutable
import kafka.utils.Logging
import kafka.common.{InvalidMessageSizeException, ErrorMapping}
import java.nio.ByteBuffer
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala Tue Jan 24 20:54:11 2012
@@ -54,8 +54,8 @@ class GZIPCompression(inputStream: Input
}
class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream) extends CompressionFacade(inputStream,outputStream) {
- import org.xerial.snappy.{SnappyInputStream}
- import org.xerial.snappy.{SnappyOutputStream}
+ import org.xerial.snappy.SnappyInputStream
+ import org.xerial.snappy.SnappyOutputStream
val snappyIn:SnappyInputStream = if (inputStream == null) null else new SnappyInputStream(inputStream)
val snappyOut:SnappyOutputStream = if (outputStream == null) null else new SnappyOutputStream(outputStream)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Tue Jan 24 20:54:11 2012
@@ -22,8 +22,6 @@ import java.nio._
import java.nio.channels._
import java.util.concurrent.atomic._
-import kafka._
-import kafka.message._
import kafka.utils._
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala Tue Jan 24 20:54:11 2012
@@ -18,9 +18,6 @@
package kafka.message
import java.nio._
-import java.nio.channels._
-import java.util.zip.CRC32
-import java.util.UUID
import kafka.utils._
import kafka.common.UnknownMagicByteException
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala Tue Jan 24 20:54:11 2012
@@ -26,7 +26,7 @@ import kafka.utils._
*
*/
@nonthreadsafe
-private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive {
+private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging {
private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4)
private var contentBuffer: ByteBuffer = null
@@ -81,7 +81,7 @@ private[kafka] class BoundedByteBufferRe
}
catch {
case e: OutOfMemoryError => {
- logger.error("OOME with size " + size, e)
+ error("OOME with size " + size, e)
throw e
}
case e2 =>
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
package kafka.network
-import java.util.ArrayList
import java.util.concurrent._
object RequestChannel {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Tue Jan 24 20:54:11 2012
@@ -21,7 +21,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic._
import java.net._
import java.io._
-import java.nio._
import java.nio.channels._
import kafka.utils._
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala Tue Jan 24 20:54:11 2012
@@ -17,8 +17,6 @@
package kafka.network
-import java.util.concurrent.atomic._
-import javax.management._
import kafka.utils._
import kafka.api.RequestKeys
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Tue Jan 24 20:54:11 2012
@@ -16,7 +16,7 @@
*/
package kafka.producer
-import collection.mutable.Map
+import collection.immutable.Map
import collection.SortedSet
import kafka.cluster.{Broker, Partition}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,7 @@
package kafka.producer
import collection.mutable.HashMap
-import collection.mutable.Map
+import collection.immutable.Map
import collection.SortedSet
import kafka.cluster.{Broker, Partition}
import kafka.common.InvalidConfigException
@@ -90,7 +90,7 @@ private[producer] class ConfigBrokerPart
brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),
brokerIdHostPort(1), brokerIdHostPort(2).toInt))
}
- brokerInfo
+ brokerInfo.toMap
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Tue Jan 24 20:54:11 2012
@@ -18,14 +18,10 @@
package kafka.producer
import scala.collection.JavaConversions._
-import org.I0Itec.zkclient._
import joptsimple._
-import java.util.Arrays.asList
import java.util.Properties
-import java.util.Random
import java.io._
import kafka.message._
-import kafka.utils._
import kafka.serializer._
object ConsoleProducer {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,7 @@ package kafka.producer
import async.MissingConfigException
import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.{Logger, AppenderSkeleton}
+import org.apache.log4j.AppenderSkeleton
import kafka.utils.{Utils, Logging}
import kafka.serializer.Encoder
import java.util.{Properties, Date}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,6 @@ package kafka.producer
import kafka.utils.Utils
import java.util.Properties
-import kafka.message.{CompressionUtils, CompressionCodec}
class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
/** the broker to which the producer sends events */
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Tue Jan 24 20:54:11 2012
@@ -18,7 +18,7 @@ package kafka.producer
import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
import collection.mutable.HashMap
-import collection.mutable.Map
+import collection.immutable.Map
import kafka.utils.Logging
import collection.immutable.TreeSet
import kafka.cluster.{Broker, Partition}
@@ -188,13 +188,9 @@ private[producer] class ZKBrokerPartitio
* @return a mapping from brokerId to (host, port)
*/
private def getZKBrokerInfo(): Map[Int, Broker] = {
- val brokers = new HashMap[Int, Broker]()
val allBrokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).map(bid => bid.toInt)
- allBrokerIds.foreach { bid =>
- val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
- brokers += (bid -> Broker.createBroker(bid, brokerInfo))
- }
- brokers
+ val brokers = ZkUtils.getBrokerInfoFromIds(zkClient, allBrokerIds)
+ allBrokerIds.zip(brokers).toMap
}
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Tue Jan 24 20:54:11 2012
@@ -106,29 +106,26 @@ private[kafka] class AsyncProducer[T](co
if(!added) {
AsyncProducerStats.recordDroppedEvents
- logger.error("Event queue is full of unsent messages, could not send event: " + event.toString)
+ error("Event queue is full of unsent messages, could not send event: " + event.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString)
}else {
- if(logger.isTraceEnabled) {
- logger.trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString)
- logger.trace("Remaining queue size: " + queue.remainingCapacity)
- }
+ trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString)
+ trace("Remaining queue size: " + queue.remainingCapacity)
}
}
def close = {
if(cbkHandler != null) {
cbkHandler.close
- logger.info("Closed the callback handler")
+ info("Closed the callback handler")
}
closed.set(true)
queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1))
- if(logger.isDebugEnabled)
- logger.debug("Added shutdown command to the queue")
+ debug("Added shutdown command to the queue")
sendThread.shutdown
sendThread.awaitShutdown
producer.close
- logger.info("Closed AsyncProducer")
+ info("Closed AsyncProducer")
}
// for testing only
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Jan 24 20:54:11 2012
@@ -23,9 +23,11 @@ import kafka.network._
import kafka.message._
import kafka.api._
import kafka.common.ErrorMapping
-import kafka.utils.SystemTime
-import kafka.utils.Logging
import java.io.IOException
+import kafka.utils.{SystemTime, Logging}
+import collection.mutable.ListBuffer
+import kafka.admin.{CreateTopicCommand, AdminUtils}
+import java.lang.IllegalStateException
/**
* Logic to handle the various Kafka requests
@@ -42,6 +44,7 @@ class KafkaApis(val logManager: LogManag
case RequestKeys.MultiFetch => handleMultiFetchRequest(receive)
case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
case RequestKeys.Offsets => handleOffsetRequest(receive)
+ case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
}
}
@@ -129,4 +132,38 @@ class KafkaApis(val logManager: LogManag
val response = new OffsetArraySend(offsets)
Some(response)
}
+
+ def handleTopicMetadataRequest(request: Receive): Option[Send] = {
+ val metadataRequest = TopicMetadataRequest.readFrom(request.buffer)
+
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Topic metadata request " + metadataRequest.toString())
+
+ val topicsMetadata = new ListBuffer[TopicMetadata]()
+ val config = logManager.getServerConfig
+ val zkClient = logManager.getZookeeperClient
+ val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+
+ metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+ val topic = topicAndMetadata._1
+ topicAndMetadata._2 match {
+ case Some(metadata) => topicsMetadata += metadata
+ case None =>
+ /* check if auto creation of topics is turned on */
+ if(config.autoCreateTopics) {
+ CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
+ config.defaultReplicationFactor)
+ info("Auto creation of topic %s with partitions %d and replication factor %d is successful!"
+ .format(topic, config.numPartitions, config.defaultReplicationFactor))
+ val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+ newTopicMetadata match {
+ case Some(topicMetadata) => topicsMetadata += topicMetadata
+ case None =>
+ throw new IllegalStateException("Topic metadata for automatically created topic %s does not exist".format(topic))
+ }
+ }
+ }
+ }
+ Some(new TopicMetadataSend(topicsMetadata))
+ }
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Tue Jan 24 20:54:11 2012
@@ -79,9 +79,6 @@ class KafkaConfig(props: Properties) ext
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
- /* enable zookeeper registration in the server */
- val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
-
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
@@ -93,4 +90,10 @@ class KafkaConfig(props: Properties) ext
/* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+
+ /* enable auto creation of topic on the server */
+ val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
+
+ /* default replication factors for automatically created topics */
+ val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Tue Jan 24 20:54:11 2012
@@ -17,9 +17,6 @@
package kafka.server
-import java.util.concurrent._
-import java.util.concurrent.atomic._
-import org.apache.log4j.Logger
import kafka.network._
import kafka.utils._
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Jan 24 20:54:11 2012
@@ -17,13 +17,11 @@
package kafka.server
-import scala.reflect.BeanProperty
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.io.File
-import org.apache.log4j.Logger
import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
-import kafka.network.{SocketServerStats, SocketServer, RequestChannel}
+import kafka.network.{SocketServerStats, SocketServer}
import kafka.log.LogManager
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Jan 24 20:54:11 2012
@@ -18,12 +18,11 @@
package kafka.server
import kafka.utils._
-import kafka.cluster.Broker
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.log.LogManager
import java.net.InetAddress
+import kafka.common.KafkaZookeeperClient
/**
* Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -41,7 +40,7 @@ class KafkaZooKeeper(config: KafkaConfig
def startup() {
/* start client */
info("connecting to ZK: " + config.zkConnect)
- zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+ zkClient = KafkaZookeeperClient.getZookeeperClient(config)
zkClient.subscribeStateChanges(new SessionExpireListener)
}
@@ -49,17 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig
info("Registering broker " + brokerIdPath)
val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
val creatorId = hostName + "-" + System.currentTimeMillis
- val broker = new Broker(config.brokerId, creatorId, hostName, config.port)
- try {
- ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
- } catch {
- case e: ZkNodeExistsException =>
- throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
- "indicates that you either have configured a brokerid that is already in use, or " +
- "else you have shutdown this broker and restarted it faster than the zookeeper " +
- "timeout so it appears to be re-registering.")
- }
- info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+ ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
}
def registerTopicInZk(topic: String) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala Tue Jan 24 20:54:11 2012
@@ -17,10 +17,7 @@
package kafka.server
-import java.nio._
-import java.nio.channels._
import kafka.network._
-import kafka.message._
import kafka.utils._
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/JmxTool.scala Tue Jan 24 20:54:11 2012
@@ -22,7 +22,7 @@ import java.util.Date
import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
-import joptsimple.{OptionSet, OptionParser}
+import joptsimple.OptionParser
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.math._
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ProducerShell.scala Tue Jan 24 20:54:11 2012
@@ -17,12 +17,9 @@
package kafka.tools
-import java.net.URI
import java.io._
import joptsimple._
-import kafka.message._
import kafka.producer._
-import java.util.Properties
import kafka.utils.Utils
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Tue Jan 24 20:54:11 2012
@@ -17,19 +17,17 @@
package kafka.tools
-import java.io.File
import joptsimple.OptionParser
-import org.apache.log4j.Logger
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.serializer.DefaultEncoder
import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Utils, Logging}
+import kafka.utils.{ZKStringSerializer, Logging}
import kafka.api.OffsetRequest
import org.I0Itec.zkclient._
-import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
+import kafka.message.{CompressionCodec, Message}
object ReplayLogProducer extends Logging {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Tue Jan 24 20:54:11 2012
@@ -22,7 +22,6 @@ import joptsimple._
import kafka.api.FetchRequest
import kafka.utils._
import kafka.consumer._
-import kafka.server._
/**
* Command line program to dump out messages to standard out using the simple consumer
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,6 @@ package kafka.utils
import java.util.concurrent._
import java.util.concurrent.atomic._
-import kafka.utils._
/**
* A scheduler for running jobs in the background
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Range.scala Tue Jan 24 20:54:11 2012
@@ -17,9 +17,8 @@
package kafka.utils
-import scala.math._
-/**
+/**
* A generic range value with a start and end
*/
trait Range {