You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/12/01 03:04:29 UTC
svn commit: r1415909 - in /kafka/branches/0.8:
core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/client/
core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/produce...
Author: nehanarkhede
Date: Sat Dec 1 02:04:19 2012
New Revision: 1415909
URL: http://svn.apache.org/viewvc?rev=1415909&view=rev
Log:
KAFKA-608 and KAFKA-630 getTopicMetadata does not respect producer config settings and fix auto-create topic; patched by Neha Narkhede; reviewed by Jun Rao
Added:
kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala
Modified:
kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
kafka/branches/0.8/system_test/testcase_to_run.json
Modified: kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Sat Dec 1 02:04:19 2012
@@ -25,7 +25,8 @@ import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection._
import scala.collection.mutable
-import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
+import kafka.common._
+import scala.Some
object AdminUtils extends Logging {
val rand = new Random
@@ -82,7 +83,7 @@ object AdminUtils extends Logging {
ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
- case e: ZkNodeExistsException => throw new AdministrationException("topic %s already exists".format(topic))
+ case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2 => throw new AdministrationException(e2.toString)
}
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala Sat Dec 1 02:04:19 2012
@@ -6,20 +6,28 @@ import kafka.api._
import kafka.producer._
import kafka.common.KafkaException
import kafka.utils.{Utils, Logging}
+import java.util.Properties
/**
* Helper functions common to clients (producer, consumer, or admin)
*/
object ClientUtils extends Logging{
- def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
+ /**
+ * Used by the producer to send a metadata request since it has access to the ProducerConfig
+ * @param topics The topics for which the metadata needs to be fetched
+ * @param brokers The brokers in the cluster as configured on the producer through broker.list
+ * @param producerConfig The producer's config
+ * @return topic metadata response
+ */
+ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
- val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
+ val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
info("Fetching metadata for topic %s".format(topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
@@ -39,7 +47,22 @@ object ClientUtils extends Logging{
}
return topicMetadataResponse
}
-
+
+ /**
+ * Used by a non-producer client to send a metadata request
+ * @param topics The topics for which the metadata needs to be fetched
+ * @param brokers The brokers in the cluster as configured on the client
+ * @param clientId The client's identifier
+ * @return topic metadata response
+ */
+ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
+ val props = new Properties()
+ props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
+ props.put("clientid", clientId)
+ val producerConfig = new ProducerConfig(props)
+ fetchTopicMetadata(topics, brokers, producerConfig)
+ }
+
/**
* Parse a list of broker urls in the form host1:port1, host2:port2, ...
*/
Modified: kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Sat Dec 1 02:04:19 2012
@@ -47,7 +47,9 @@ private[kafka] case class Broker(val id:
override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
- def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
+ def getZkString(): String = new String(creatorId + ":" + host + ":" + port)
+
+ def getConnectionString(): String = new String(host + ":" + port)
def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)
Added: kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala?rev=1415909&view=auto
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala (added)
+++ kafka/branches/0.8/core/src/main/scala/kafka/common/TopicExistsException.scala Sat Dec 1 02:04:19 2012
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class TopicExistsException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
\ No newline at end of file
Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sat Dec 1 02:04:19 2012
@@ -176,6 +176,7 @@ object ConsoleConsumer extends Logging {
}
})
+ var numMessages = 0L
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
try {
@@ -188,6 +189,7 @@ object ConsoleConsumer extends Logging {
for(messageAndTopic <- iter) {
try {
formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
+ numMessages += 1
} catch {
case e =>
if (skipMessageOnError)
@@ -198,6 +200,7 @@ object ConsoleConsumer extends Logging {
if(System.out.checkError()) {
// This means no one is listening to our output stream any more, time to shutdown
System.err.println("Unable to write to standard out, closing consumer.")
+ System.err.println("Consumed %d messages".format(numMessages))
formatter.close()
connector.shutdown()
System.exit(1)
@@ -206,6 +209,7 @@ object ConsoleConsumer extends Logging {
} catch {
case e => error("Error processing message, stopping consumer: ", e)
}
+ System.out.println("Consumed %d messages".format(numMessages))
System.out.flush()
formatter.close()
connector.shutdown()
Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Sat Dec 1 02:04:19 2012
@@ -54,7 +54,7 @@ class ConsumerFetcherManager(private val
try {
trace("Partitions without leader %s".format(noLeaderPartitionSet))
val brokers = getAllBrokersInCluster(zkClient)
- val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
topicsMetadata.foreach(
tmd => {
Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Sat Dec 1 02:04:19 2012
@@ -402,7 +402,7 @@ private[kafka] class ZookeeperConsumerCo
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val brokers = getAllBrokersInCluster(zkClient)
- val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
topicsMetadata.foreach(m =>{
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Sat Dec 1 02:04:19 2012
@@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package kafka.producer
import collection.mutable.HashMap
@@ -72,7 +72,7 @@ class BrokerPartitionInfo(producerConfig
*/
def updateInfo(topics: Set[String]) {
var topicsMetadata: Seq[TopicMetadata] = Nil
- val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
+ val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig)
topicsMetadata = topicMetadataResponse.topicsMetadata
// throw partition specific exception
topicsMetadata.foreach(tmd =>{
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Sat Dec 1 02:04:19 2012
@@ -37,17 +37,6 @@ object ProducerPool {
props.putAll(config.props.props)
new SyncProducer(new SyncProducerConfig(props))
}
-
- /**
- * Used in ClientUtils to send TopicMetadataRequest to a broker.
- */
- def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
- val props = new Properties()
- props.put("host", broker.host)
- props.put("port", broker.port.toString)
- props.put("client.id", clientId)
- new SyncProducer(new SyncProducerConfig(props))
- }
}
class ProducerPool(val config: ProducerConfig) extends Logging {
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Sat Dec 1 02:04:19 2012
@@ -64,5 +64,5 @@ object SyncProducerConfig {
val DefaultCorrelationId = -1
val DefaultClientId = ""
val DefaultRequiredAcks : Short = 0
- val DefaultAckTimeoutMs = 500
+ val DefaultAckTimeoutMs = 1500
}
\ No newline at end of file
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Sat Dec 1 02:04:19 2012
@@ -48,7 +48,7 @@ class DefaultEventHandler[K,V](config: P
lock synchronized {
val serializedData = serialize(events)
serializedData.foreach{
- keyed =>
+ keyed =>
val dataSize = keyed.message.payloadSize
producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
@@ -82,7 +82,7 @@ class DefaultEventHandler[K,V](config: P
try {
for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled)
- messagesPerBrokerMap.foreach(partitionAndEvent =>
+ messagesPerBrokerMap.foreach(partitionAndEvent =>
trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
@@ -173,7 +173,7 @@ class DefaultEventHandler[K,V](config: P
debug("Broker partitions registered for topic: %s are %s"
.format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
val totalNumPartitions = topicPartitionsList.length
- if(totalNumPartitions == 0)
+ if(totalNumPartitions == 0)
throw new NoBrokersForPartitionException("Partition key = " + m.key)
topicPartitionsList
}
@@ -189,10 +189,10 @@ class DefaultEventHandler[K,V](config: P
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
"\n Valid values are > 0")
- val partition =
- if(key == null)
+ val partition =
+ if(key == null)
Utils.abs(counter.getAndIncrement()) % numPartitions
- else
+ else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
@@ -217,11 +217,16 @@ class DefaultEventHandler[K,V](config: P
try {
val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)
- trace("Producer sent messages for topics %s to broker %d on %s:%d"
+ debug("Producer sent messages for topics %s to broker %d on %s:%d"
.format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
if (response.status.size != producerRequest.data.size)
throw new KafkaException("Incomplete response (%s) for producer request (%s)"
- .format(response, producerRequest))
+ .format(response, producerRequest))
+ if (logger.isTraceEnabled) {
+ val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+ successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
+ trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
+ }
response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
.map(partitionStatus => partitionStatus._1)
} catch {
@@ -236,33 +241,33 @@ class DefaultEventHandler[K,V](config: P
private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
/** enforce the compressed.topics config here.
- * If the compression codec is anything other than NoCompressionCodec,
- * Enable compression only for specified topics if any
- * If the list of compressed topics is empty, then enable the specified compression codec for all topics
- * If the compression codec is NoCompressionCodec, compression is disabled for all topics
- */
+ * If the compression codec is anything other than NoCompressionCodec,
+ * Enable compression only for specified topics if any
+ * If the list of compressed topics is empty, then enable the specified compression codec for all topics
+ * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+ */
val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
val rawMessages = messages.map(_.message)
( topicAndPartition,
config.compressionCodec match {
case NoCompressionCodec =>
- trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
+ debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
case _ =>
config.compressedTopics.size match {
case 0 =>
- trace("Sending %d messages with compression codec %d to %s"
+ debug("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
case _ =>
if(config.compressedTopics.contains(topicAndPartition.topic)) {
- trace("Sending %d messages with compression codec %d to %s"
+ debug("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
}
else {
- trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+ debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
.format(messages.size, topicAndPartition, config.compressedTopics.toString))
new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Dec 1 02:04:19 2012
@@ -434,9 +434,13 @@ class KafkaApis(val requestChannel: Requ
try {
/* check if auto creation of topics is turned on */
if (config.autoCreateTopics) {
- CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
- info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
- .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+ try {
+ CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+ info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+ .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+ } catch {
+ case e: TopicExistsException => // let it go, possibly another broker created this topic
+ }
val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient)
topicsMetadata += newTopicMetadata
newTopicMetadata.errorCode match {
Modified: kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Sat Dec 1 02:04:19 2012
@@ -125,7 +125,7 @@ object SimpleConsumerShell extends Loggi
// getting topic metadata
info("Getting topic metatdata...")
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
- val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1)
Modified: kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Sat Dec 1 02:04:19 2012
@@ -184,7 +184,7 @@ object ZkUtils extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val broker = new Broker(id, creator, host, port)
try {
- createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+ createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
} catch {
case e: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Sat Dec 1 02:04:19 2012
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ZkUtils, TestUtils}
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
fail("shouldn't be able to create a topic already exists")
} catch {
- case e: AdministrationException => // this is good
+ case e: TopicExistsException => // this is good
case e2 => throw e2
}
}
Modified: kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Sat Dec 1 02:04:19 2012
@@ -126,11 +126,6 @@ object ProducerPerformance extends Loggi
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
- val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here")
@@ -223,6 +218,7 @@ object ProducerPerformance extends Loggi
private val threadIdLabel = "ThreadID"
private val topicLabel = "Topic"
private var leftPaddedSeqId : String = ""
+
private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
// Each thread gets a unique range of sequential no. for its ids.
// Eg. 1000 msg in 10 threads => 100 msg per thread
@@ -246,12 +242,13 @@ object ProducerPerformance extends Loggi
private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
- val message = if(config.seqIdMode) {
- val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
- generateMessageWithSeqId(topic, seqId, msgSize)
- } else {
- new Array[Byte](msgSize)
- }
+ val message =
+ if(config.seqIdMode) {
+ val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+ generateMessageWithSeqId(topic, seqId, msgSize)
+ } else {
+ new Array[Byte](msgSize)
+ }
(new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
}
Modified: kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Sat Dec 1 02:04:19 2012
@@ -392,7 +392,7 @@ class ReplicaBasicTest(ReplicationUtils,
if logRetentionTest.lower() == "false":
self.log_message("starting consumer in the background")
kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
- time.sleep(1)
+ time.sleep(10)
# =============================================
# this testcase is completed - stop all entities
@@ -421,11 +421,11 @@ class ReplicaBasicTest(ReplicationUtils,
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
elif consumerMultiTopicsMode.lower() == "true":
- kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+ #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv)
else:
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv)
- kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+ #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
# =============================================
Modified: kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json (original)
+++ kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/cluster_config.json Sat Dec 1 02:04:19 2012
@@ -2,105 +2,56 @@
"cluster_config": [
{
"entity_id": "0",
- "hostname": "localhost",
+ "hostname": "esv4-app18.corp",
"role": "zookeeper",
"cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
+ "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+ "java_home": "/export/apps/jdk/JDK-1_6_0_21",
"jmx_port": "9900"
},
-
-
{
"entity_id": "1",
- "hostname": "localhost",
+ "hostname": "esv4-app18.corp",
"role": "broker",
"cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
+ "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+ "java_home": "/export/apps/jdk/JDK-1_6_0_21",
"jmx_port": "9901"
},
{
"entity_id": "2",
- "hostname": "localhost",
+ "hostname": "esv4-app19.corp",
"role": "broker",
"cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
+ "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+ "java_home": "/export/apps/jdk/JDK-1_6_0_21",
"jmx_port": "9902"
},
{
"entity_id": "3",
- "hostname": "localhost",
+ "hostname": "esv4-app20.corp",
"role": "broker",
"cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
+ "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+ "java_home": "/export/apps/jdk/JDK-1_6_0_21",
"jmx_port": "9903"
},
{
"entity_id": "4",
- "hostname": "localhost",
- "role": "broker",
- "cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
- "jmx_port": "9904"
- },
- {
- "entity_id": "5",
- "hostname": "localhost",
- "role": "broker",
- "cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
- "jmx_port": "9905"
- },
- {
- "entity_id": "6",
- "hostname": "localhost",
- "role": "broker",
- "cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
- "jmx_port": "9906"
- },
- {
- "entity_id": "7",
- "hostname": "localhost",
- "role": "broker",
- "cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
- "jmx_port": "9907"
- },
- {
- "entity_id": "8",
- "hostname": "localhost",
- "role": "broker",
- "cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
- "jmx_port": "9908"
- },
-
-
- {
- "entity_id": "9",
- "hostname": "localhost",
+ "hostname": "esv4-app18.corp",
"role": "producer_performance",
"cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
+ "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+ "java_home": "/export/apps/jdk/JDK-1_6_0_21",
"jmx_port": "9909"
},
{
- "entity_id": "10",
- "hostname": "localhost",
+ "entity_id": "5",
+ "hostname": "esv4-app18.corp",
"role": "console_consumer",
"cluster_name": "source",
- "kafka_home": "default",
- "java_home": "default",
+ "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
+ "java_home": "/export/apps/jdk/JDK-1_6_0_21",
"jmx_port": "9910"
}
]
Modified: kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json (original)
+++ kafka/branches/0.8/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json Sat Dec 1 02:04:19 2012
@@ -13,7 +13,7 @@
"testcase_args": {
"broker_type": "leader",
"bounce_broker": "false",
- "replica_factor": "8",
+ "replica_factor": "3",
"num_partition": "2",
"num_iteration": "1",
"producer_multi_topics_mode": "true",
@@ -59,51 +59,6 @@
},
{
"entity_id": "4",
- "port": "9094",
- "brokerid": "4",
- "log.file.size": "1048576",
- "log.dir": "/tmp/kafka_server_4_logs",
- "log_filename": "kafka_server_9094.log",
- "config_filename": "kafka_server_9094.properties"
- },
- {
- "entity_id": "5",
- "port": "9095",
- "brokerid": "5",
- "log.file.size": "1048576",
- "log.dir": "/tmp/kafka_server_5_logs",
- "log_filename": "kafka_server_9095.log",
- "config_filename": "kafka_server_9095.properties"
- },
- {
- "entity_id": "6",
- "port": "9096",
- "brokerid": "6",
- "log.file.size": "1048576",
- "log.dir": "/tmp/kafka_server_6_logs",
- "log_filename": "kafka_server_9096.log",
- "config_filename": "kafka_server_9096.properties"
- },
- {
- "entity_id": "7",
- "port": "9097",
- "brokerid": "7",
- "log.file.size": "1048576",
- "log.dir": "/tmp/kafka_server_7_logs",
- "log_filename": "kafka_server_9097.log",
- "config_filename": "kafka_server_9097.properties"
- },
- {
- "entity_id": "8",
- "port": "9098",
- "brokerid": "8",
- "log.file.size": "1048576",
- "log.dir": "/tmp/kafka_server_8_logs",
- "log_filename": "kafka_server_9098.log",
- "config_filename": "kafka_server_9098.properties"
- },
- {
- "entity_id": "9",
"topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
"threads": "5",
"compression-codec": "0",
@@ -111,13 +66,13 @@
"message": "500",
"request-num-acks": "-1",
"producer-retry-backoff-ms": "3500",
- "producer-num-retries": "5",
+ "producer-num-retries": "3",
"async":"false",
"log_filename": "producer_performance_9.log",
"config_filename": "producer_performance_9.properties"
},
{
- "entity_id": "10",
+ "entity_id": "5",
"topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
"groupid": "mytestgroup",
"consumer-timeout-ms": "60000",
Modified: kafka/branches/0.8/system_test/testcase_to_run.json
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/testcase_to_run.json?rev=1415909&r1=1415908&r2=1415909&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/testcase_to_run.json (original)
+++ kafka/branches/0.8/system_test/testcase_to_run.json Sat Dec 1 02:04:19 2012
@@ -1,5 +1,5 @@
{
"ReplicaBasicTest" : [
- "testcase_1"
+ "testcase_9051"
]
}