You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC
[9/30] git commit: KAFKA-646 Provide aggregate stats at the high
level Producer and ZookeeperConsumerConnector level; reviewed by Neha Narkhede
KAFKA-646 Provide aggregate stats at the high level Producer and ZookeeperConsumerConnector level; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ad9018e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ad9018e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ad9018e
Branch: refs/heads/trunk
Commit: 8ad9018ea197676bdc4e57ccf985dc752e54e5c7
Parents: f0d2633
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Mon Dec 17 14:05:17 2012 -0800
Committer: Neha Narkhede <ne...@apache.org>
Committed: Mon Dec 17 14:05:17 2012 -0800
----------------------------------------------------------------------
.../scala/kafka/admin/CreateTopicCommand.scala | 1 +
.../kafka/common/InvalidClientIdException.scala | 22 -----
.../main/scala/kafka/consumer/ConsumerConfig.scala | 31 +++++++-
.../scala/kafka/consumer/ConsumerIterator.scala | 3 +-
.../scala/kafka/consumer/ConsumerTopicStats.scala | 26 +++++-
.../main/scala/kafka/consumer/KafkaStream.scala | 4 +-
.../scala/kafka/consumer/PartitionTopicInfo.scala | 4 +-
.../main/scala/kafka/consumer/SimpleConsumer.scala | 22 +++---
.../consumer/ZookeeperConsumerConnector.scala | 14 +--
core/src/main/scala/kafka/producer/Producer.scala | 61 +++-----------
.../main/scala/kafka/producer/ProducerConfig.scala | 29 +++++++
.../main/scala/kafka/producer/SyncProducer.scala | 22 +++---
.../kafka/producer/async/DefaultEventHandler.scala | 7 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 46 ++++++-----
.../main/scala/kafka/utils/ClientIdAndTopic.scala | 64 ---------------
.../unit/kafka/consumer/ConsumerIteratorTest.scala | 6 +-
.../scala/unit/kafka/integration/FetcherTest.scala | 6 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 40 +++-------
.../test/scala/unit/kafka/utils/ClientIdTest.scala | 60 --------------
.../test/scala/unit/kafka/utils/TestUtils.scala | 1 -
.../test/scala/unit/kafka/utils/TopicTest.scala | 61 --------------
21 files changed, 172 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index e304fee..fd3a397 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -21,6 +21,7 @@ import joptsimple.OptionParser
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import scala.collection.mutable
+import kafka.common.Topic
object CreateTopicCommand extends Logging {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/common/InvalidClientIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InvalidClientIdException.scala b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
index edf072d..e69de29 100644
--- a/core/src/main/scala/kafka/common/InvalidClientIdException.scala
+++ b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class InvalidClientIdException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 660a977..50a2cd4 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -19,9 +19,10 @@ package kafka.consumer
import java.util.Properties
import kafka.api.OffsetRequest
-import kafka.utils.{VerifiableProperties, ZKConfig}
+import kafka.utils._
+import kafka.common.{InvalidConfigException, Config}
-object ConsumerConfig {
+object ConsumerConfig extends Config {
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
@@ -43,6 +44,28 @@ object ConsumerConfig {
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
+
+ def validate(config: ConsumerConfig) {
+ validateClientId(config.clientId)
+ validateGroupId(config.groupId)
+ validateAutoOffsetReset(config.autoOffsetReset)
+ }
+
+ def validateClientId(clientId: String) {
+ validateChars("clientid", clientId)
+ }
+
+ def validateGroupId(groupId: String) {
+ validateChars("groupid", groupId)
+ }
+
+ def validateAutoOffsetReset(autoOffsetReset: String) {
+ autoOffsetReset match {
+ case OffsetRequest.SmallestTimeString =>
+ case OffsetRequest.LargestTimeString =>
+ case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of autoOffsetReset in ConsumerConfig")
+ }
+ }
}
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
@@ -109,8 +132,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
/**
- * Cliient id is specified by the kafka consumer client, used to distinguish different clients
+ * Client id is specified by the kafka consumer client, used to distinguish different clients
*/
val clientId = props.getString("clientid", groupId)
+
+ validate(this)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index c5062fc..746a4bd 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -35,12 +35,13 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean,
- val consumerTopicStats: ConsumerTopicStats)
+ val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
+ private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index 2a9d9fb..bb38f35 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -17,20 +17,25 @@
package kafka.consumer
-import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging}
+import kafka.utils.{Pool, threadsafe, Logging}
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
+import kafka.common.ClientIdAndTopic
@threadsafe
-class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
- val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
+ val messageRate = newMeter(metricId + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
+ val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
}
+/**
+ * Tracks metrics for each topic the given consumer client has consumed data from.
+ * @param clientId The clientId of the given consumer client.
+ */
class ConsumerTopicStats(clientId: String) extends Logging {
private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
- private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
+ private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
@@ -39,3 +44,14 @@ class ConsumerTopicStats(clientId: String) extends Logging {
}
}
+/**
+ * Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map.
+ */
+object ConsumerTopicStatsRegistry {
+ private val valueFactory = (k: String) => new ConsumerTopicStats(k)
+ private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory))
+
+ def getConsumerTopicStat(clientId: String) = {
+ globalStats.getAndMaybePut(clientId)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index 569f6df..d4e0e96 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -27,11 +27,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean,
- val consumerTopicStats: ConsumerTopicStats)
+ val clientId: String)
extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
private val iter: ConsumerIterator[K,V] =
- new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats)
+ new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId)
/**
* Create an iterator over messages in the stream.
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 8c42d11..6003cab 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -29,11 +29,13 @@ class PartitionTopicInfo(val topic: String,
private val consumedOffset: AtomicLong,
private val fetchedOffset: AtomicLong,
private val fetchSize: AtomicInteger,
- private val consumerTopicStats: ConsumerTopicStats) extends Logging {
+ private val clientId: String) extends Logging {
debug("initial consumer offset of " + this + " is " + consumedOffset.get)
debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
+ private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
+
def getConsumeOffset() = consumedOffset.get
def getFetchOffset() = fetchedOffset.get
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5e1e6ab..6b83deb 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -20,8 +20,6 @@ package kafka.consumer
import kafka.api._
import kafka.network._
import kafka.utils._
-import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.utils.ZkUtils._
import collection.immutable
import kafka.common.{TopicAndPartition, KafkaException}
@@ -87,10 +85,11 @@ class SimpleConsumer(val host: String,
val bufferSize: Int,
val clientId: String) extends Logging {
- ClientId.validate(clientId)
+ ConsumerConfig.validateClientId(clientId)
private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
- private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId + "-host_%s-port_%s".format(host, port))
+ val brokerInfo = "host_%s-port_%s".format(host, port)
+ private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
private def connect(): BlockingChannel = {
close
@@ -155,12 +154,17 @@ class SimpleConsumer(val host: String,
*/
def fetch(request: FetchRequest): FetchResponse = {
var response: Receive = null
- fetchRequestAndResponseStats.requestTimer.time {
- response = sendRequest(request)
+ val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer
+ val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
+ aggregateTimer.time {
+ specificTimer.time {
+ response = sendRequest(request)
+ }
}
val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes
- fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize)
+ fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize)
+ fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
fetchResponse
}
@@ -178,7 +182,3 @@ class SimpleConsumer(val host: String,
}
}
-class FetchRequestAndResponseStats(clientId: String) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 43e9fa6..aee9293 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -20,7 +20,7 @@ package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
import locks.ReentrantLock
-import scala.collection._
+import collection._
import kafka.cluster._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -35,6 +35,7 @@ import kafka.client.ClientUtils
import com.yammer.metrics.core.Gauge
import kafka.api.OffsetRequest
import kafka.metrics._
+import scala.Some
/**
@@ -80,7 +81,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
extends ConsumerConnector with Logging with KafkaMetricsGroup {
- ClientId.validate(config.clientId)
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None
@@ -95,8 +95,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
- private val consumerTopicStats = new ConsumerTopicStats(config.clientId)
-
val consumerIdString = {
var consumerUuid : String = null
config.consumerId match {
@@ -198,7 +196,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val stream = new KafkaStream[K,V](
- queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats)
+ queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, config.clientId)
(queue, stream)
})
).flatten.toList
@@ -601,8 +599,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
case OffsetRequest.LargestTimeString =>
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
- case _ =>
- throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
}
}
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
@@ -615,7 +611,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
consumedOffset,
fetchedOffset,
new AtomicInteger(config.fetchSize),
- consumerTopicStats)
+ config.clientId)
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
}
@@ -719,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
keyDecoder,
valueDecoder,
config.enableShallowIterator,
- consumerTopicStats)
+ config.clientId)
(queue, stream)
}).toList
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 3bfd563..a183525 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -22,20 +22,15 @@ import java.util.Random
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{QueueFullException, InvalidConfigException}
+import kafka.common.QueueFullException
import kafka.metrics._
class Producer[K,V](config: ProducerConfig,
- private val eventHandler: EventHandler[K,V],
- private val producerStats: ProducerStats,
- private val producerTopicStats: ProducerTopicStats) // only for unit testing
+ private val eventHandler: EventHandler[K,V]) // only for unit testing
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
- if (config.batchSize > config.queueSize)
- throw new InvalidConfigException("Batch size can't be larger than queue size.")
-
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
private val random = new Random
@@ -53,30 +48,20 @@ class Producer[K,V](config: ProducerConfig,
config.batchSize,
config.clientId)
producerSendThread.start()
- case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
}
- KafkaMetricsReporter.startReporters(config.props)
+ private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
+ private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
- def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) =
- this(t._1, t._2, t._3, t._4)
+ KafkaMetricsReporter.startReporters(config.props)
def this(config: ProducerConfig) =
- this {
- ClientId.validate(config.clientId)
- val producerStats = new ProducerStats(config.clientId)
- val producerTopicStats = new ProducerTopicStats(config.clientId)
- (config,
- new DefaultEventHandler[K,V](config,
- Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
- Utils.createObject[Encoder[V]](config.serializerClass, config.props),
- Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
- new ProducerPool(config),
- producerStats = producerStats,
- producerTopicStats = producerTopicStats),
- producerStats,
- producerTopicStats)
- }
+ this(config,
+ new DefaultEventHandler[K,V](config,
+ Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+ Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+ Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
+ new ProducerPool(config)))
/**
* Sends the data, partitioned by key to the topic using either the
@@ -146,28 +131,4 @@ class Producer[K,V](config: ProducerConfig,
}
}
-@threadsafe
-class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
- val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
-}
-
-class ProducerTopicStats(clientId: String) {
- private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
- private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
- private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
-
- def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
-
- def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
- stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
- }
-}
-
-class ProducerStats(clientId: String) extends KafkaMetricsGroup {
- val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
- val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS)
- val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
- val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 30b1dc3..e559187 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -21,9 +21,36 @@ import async.AsyncProducerConfig
import java.util.Properties
import kafka.utils.{Utils, VerifiableProperties}
import kafka.message.{CompressionCodec, NoCompressionCodec}
+import kafka.common.{InvalidConfigException, Config}
+
+object ProducerConfig extends Config {
+ def validate(config: ProducerConfig) {
+ validateClientId(config.clientId)
+ validateBatchSize(config.batchSize, config.queueSize)
+ validateProducerType(config.producerType)
+ }
+
+ def validateClientId(clientId: String) {
+ validateChars("clientid", clientId)
+ }
+
+ def validateBatchSize(batchSize: Int, queueSize: Int) {
+ if (batchSize > queueSize)
+ throw new InvalidConfigException("Batch size = " + batchSize + " can't be larger than queue size = " + queueSize)
+ }
+
+ def validateProducerType(producerType: String) {
+ producerType match {
+ case "sync" =>
+ case "async"=>
+ case _ => throw new InvalidConfigException("Invalid value " + producerType + " for producer.type, valid values are sync/async")
+ }
+ }
+}
class ProducerConfig private (val props: VerifiableProperties)
extends AsyncProducerConfig with SyncProducerConfigShared {
+ import ProducerConfig._
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
@@ -85,4 +112,6 @@ class ProducerConfig private (val props: VerifiableProperties)
val producerRetries = props.getInt("producer.num.retries", 3)
val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+
+ validate(this)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 3183ceb..0ef320b 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -21,8 +21,6 @@ import kafka.api._
import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._
import java.util.Random
-import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
object SyncProducer {
val RequestKey: Short = 0
@@ -39,7 +37,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.bufferSize, config.requestTimeoutMs)
- val producerRequestStats = new ProducerRequestStats(config.clientId + "-host_%s-port_%s".format(config.host, config.port))
+ val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
+ val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
trace("Instantiating Scala Sync Producer")
@@ -87,10 +86,17 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
* Send a message
*/
def send(producerRequest: ProducerRequest): ProducerResponse = {
- producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes)
+ val requestSize = producerRequest.sizeInBytes
+ producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize)
+ producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
+
var response: Receive = null
- producerRequestStats.requestTimer.time {
- response = doSend(producerRequest)
+ val specificTimer = producerRequestStats.getProducerRequestStats(brokerInfo).requestTimer
+ val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
+ aggregateTimer.time {
+ specificTimer.time {
+ response = doSend(producerRequest)
+ }
}
ProducerResponse.readFrom(response.buffer)
}
@@ -150,7 +156,3 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
}
}
-class ProducerRequestStats(clientId: String) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 7d0f609..4f04862 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -33,9 +33,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val encoder: Encoder[V],
private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool,
- private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
- private val producerStats: ProducerStats,
- private val producerTopicStats: ProducerTopicStats)
+ private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
@@ -45,6 +43,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val lock = new Object()
+ private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
+ private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
+
def handle(events: Seq[KeyedMessage[K,V]]) {
lock synchronized {
val serializedData = serialize(events)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 6d73c82..96f0df6 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,7 +19,7 @@ package kafka.server
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import collection.mutable
import kafka.message.ByteBufferMessageSet
import kafka.message.MessageAndOffset
@@ -27,7 +27,7 @@ import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread}
+import kafka.utils.{Pool, ShutdownableThread}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
@@ -43,9 +43,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
private val partitionMapCond = partitionMapLock.newCondition()
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
- val fetcherStats = new FetcherStats(clientId + "-" + brokerInfo)
+ private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+ val fetcherStats = new FetcherStats(metricId)
val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
- val fetcherLagStats = new FetcherLagStats(clientId + "-" + brokerInfo)
+ val fetcherLagStats = new FetcherLagStats(metricId)
/* callbacks to be defined in subclass */
@@ -66,7 +67,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
override def doWork() {
val fetchRequestuilder = new FetchRequestBuilder().
- clientId(clientId + "-" + brokerInfo).
+ clientId(clientId).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes)
@@ -184,10 +185,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
}
-class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
private[this] var lagVal = new AtomicLong(-1L)
newGauge(
- clientIdTopicPartition + "-ConsumerLag",
+ metricId + "-ConsumerLag",
new Gauge[Long] {
def getValue = lagVal.get
}
@@ -200,29 +201,34 @@ class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends
def lag = lagVal.get
}
-class FetcherLagStats(clientId: String) {
- private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
- private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
+class FetcherLagStats(metricId: ClientIdAndBroker) {
+ private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
+ private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
- stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId))
+ stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
}
}
-class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
- val requestRate = newMeter(clientIdTopic + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
- val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+class FetcherMetrics(metricId: ClientIdBrokerTopic) extends KafkaMetricsGroup {
+ val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
+ val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
}
-class FetcherStats(clientId: String) {
- private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k)
- private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory))
+class FetcherStats(metricId: ClientIdAndBroker) {
+ private val valueFactory = (k: ClientIdBrokerTopic) => new FetcherMetrics(k)
+ private val stats = new Pool[ClientIdBrokerTopic, FetcherMetrics](Some(valueFactory))
def getFetcherStats(name: String): FetcherMetrics = {
- stats.getAndMaybePut(new ClientIdAndTopic(clientId, name))
+ stats.getAndMaybePut(new ClientIdBrokerTopic(metricId.clientId, metricId.brokerInfo, name))
}
}
-case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
- override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
+case class ClientIdBrokerTopic(clientId: String, brokerInfo: String, topic: String) {
+ override def toString = "%s-%s-%s".format(clientId, brokerInfo, topic)
}
+
+case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
+ override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
index 780339e..e69de29 100644
--- a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
+++ b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.common.InvalidTopicException
-import kafka.common.InvalidClientIdException
-import util.matching.Regex
-
-object ClientId {
- val legalChars = "[a-zA-Z0-9_-]"
- val maxNameLength = 200 // to prevent hitting filename max length limit
- private val rgx = new Regex(legalChars + "*")
-
- def validate(clientId: String) {
- if (clientId.length > maxNameLength)
- throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
-
- rgx.findFirstIn(clientId) match {
- case Some(t) =>
- if (!t.equals(clientId))
- throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
- case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
- }
- }
-}
-
-object Topic {
- val legalChars = "[a-zA-Z0-9_-]"
- val maxNameLength = 255
- private val rgx = new Regex(legalChars + "+")
-
- def validate(topic: String) {
- if (topic.length <= 0)
- throw new InvalidTopicException("topic name is illegal, can't be empty")
- else if (topic.length > maxNameLength)
- throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
-
- rgx.findFirstIn(topic) match {
- case Some(t) =>
- if (!t.equals(topic))
- throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
- case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
- }
- }
-}
-
-case class ClientIdAndTopic(clientId: String, topic:String) {
- override def toString = "%s-%s".format(clientId, topic)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 246b1ec..0b5363f 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -56,7 +56,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
new AtomicLong(consumedOffset),
new AtomicLong(0),
new AtomicInteger(0),
- new ConsumerTopicStats("")))
+ ""))
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
override def setUp() {
@@ -80,8 +80,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
new StringDecoder(),
new StringDecoder(),
enableShallowIterator = false,
- consumerTopicStats = new ConsumerTopicStats(""))
- var receivedMessages = (0 until 5).map(i => iter.next.message).toList
+ clientId = "")
+ val receivedMessages = (0 until 5).map(i => iter.next.message).toList
assertFalse(iter.hasNext)
assertEquals(1, queue.size) // This is only the shutdown command.
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 021f419..61d9fc9 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -50,7 +50,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
new AtomicLong(0),
new AtomicLong(0),
new AtomicInteger(0),
- new ConsumerTopicStats("")))
+ ""))
var fetcher: ConsumerFetcherManager = null
@@ -84,7 +84,9 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
- val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder())
+ val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+ new DefaultEncoder(),
+ new StringEncoder())
val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
messages += conf.brokerId -> ms
producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 90a7ed8..4767618 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -68,10 +68,7 @@ class AsyncProducerTest extends JUnit3Suite {
val config = new ProducerConfig(props)
val produceData = getProduceData(12)
- val producer = new Producer[String, String](config,
- mockEventHandler,
- new ProducerStats(""),
- new ProducerTopicStats(""))
+ val producer = new Producer[String, String](config, mockEventHandler)
try {
// send all 10 messages, should hit the batch size and then reach broker
producer.send(produceData: _*)
@@ -192,9 +189,7 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = new IntEncoder(),
producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- producerStats = new ProducerStats(""),
- producerTopicStats = new ProducerTopicStats(""))
+ topicPartitionInfos = topicPartitionInfos)
val topic1Broker1Data =
ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@@ -233,9 +228,7 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = new StringEncoder,
keyEncoder = new StringEncoder,
producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- producerStats = new ProducerStats(""),
- producerTopicStats = new ProducerTopicStats(""))
+ topicPartitionInfos = topicPartitionInfos)
val serializedData = handler.serialize(produceData)
val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
@@ -263,9 +256,7 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- producerStats = new ProducerStats(""),
- producerTopicStats = new ProducerTopicStats(""))
+ topicPartitionInfos = topicPartitionInfos)
try {
handler.partitionAndCollate(producerDataList)
fail("Should fail with UnknownTopicOrPartitionException")
@@ -296,9 +287,7 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = new StringEncoder,
keyEncoder = new StringEncoder,
producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- producerStats = new ProducerStats(""),
- producerTopicStats = new ProducerTopicStats(""))
+ topicPartitionInfos = topicPartitionInfos)
try {
handler.handle(producerDataList)
fail("Should fail with NoBrokersForPartitionException")
@@ -345,9 +334,7 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- producerStats = new ProducerStats(""),
- producerTopicStats = new ProducerTopicStats(""))
+ topicPartitionInfos = topicPartitionInfos)
val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -390,14 +377,9 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = new StringEncoder,
keyEncoder = new StringEncoder,
producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- producerStats = new ProducerStats(""),
- producerTopicStats = new ProducerTopicStats(""))
-
- val producer = new Producer[String, String](config,
- handler,
- new ProducerStats(""),
- new ProducerTopicStats(""))
+ topicPartitionInfos = topicPartitionInfos)
+
+ val producer = new Producer[String, String](config, handler)
try {
// send all 10 messages, should create 2 batches and 2 syncproducer calls
producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
@@ -453,9 +435,7 @@ class AsyncProducerTest extends JUnit3Suite {
encoder = new StringEncoder(),
keyEncoder = new NullEncoder[Int](),
producerPool = producerPool,
- topicPartitionInfos = topicPartitionInfos,
- producerStats = new ProducerStats(""),
- producerTopicStats = new ProducerTopicStats(""))
+ topicPartitionInfos = topicPartitionInfos)
val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
handler.handle(data)
handler.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
index 794dcdc..e69de29 100644
--- a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import junit.framework.Assert._
-import collection.mutable.ArrayBuffer
-import kafka.common.InvalidClientIdException
-import org.junit.Test
-
-class ClientIdTest {
-
- @Test
- def testInvalidClientIds() {
- val invalidclientIds = new ArrayBuffer[String]()
- var longName = "ATCG"
- for (i <- 1 to 6)
- longName += longName
- invalidclientIds += longName
- val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
- for (weirdChar <- badChars) {
- invalidclientIds += "Is" + weirdChar + "funny"
- }
-
- for (i <- 0 until invalidclientIds.size) {
- try {
- ClientId.validate(invalidclientIds(i))
- fail("Should throw InvalidClientIdException.")
- }
- catch {
- case e: InvalidClientIdException => "This is good."
- }
- }
-
- val validClientIds = new ArrayBuffer[String]()
- validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
- for (i <- 0 until validClientIds.size) {
- try {
- ClientId.validate(validClientIds(i))
- }
- catch {
- case e: Exception => fail("Should not throw exception.")
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a145f2a..cebb371 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,7 +24,6 @@ import java.nio.channels._
import java.util.Random
import java.util.Properties
import junit.framework.Assert._
-import kafka.api._
import kafka.server._
import kafka.producer._
import kafka.message._
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad9018e/core/src/test/scala/unit/kafka/utils/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TopicTest.scala b/core/src/test/scala/unit/kafka/utils/TopicTest.scala
index e567be5..e69de29 100644
--- a/core/src/test/scala/unit/kafka/utils/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/TopicTest.scala
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import junit.framework.Assert._
-import collection.mutable.ArrayBuffer
-import kafka.common.InvalidTopicException
-import org.junit.Test
-
-class TopicTest {
-
- @Test
- def testInvalidTopicNames() {
- val invalidTopicNames = new ArrayBuffer[String]()
- invalidTopicNames += ("", ".", "..")
- var longName = "ATCG"
- for (i <- 1 to 6)
- longName += longName
- invalidTopicNames += longName
- val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
- for (weirdChar <- badChars) {
- invalidTopicNames += "Is" + weirdChar + "funny"
- }
-
- for (i <- 0 until invalidTopicNames.size) {
- try {
- Topic.validate(invalidTopicNames(i))
- fail("Should throw InvalidTopicException.")
- }
- catch {
- case e: InvalidTopicException => "This is good."
- }
- }
-
- val validTopicNames = new ArrayBuffer[String]()
- validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
- for (i <- 0 until validTopicNames.size) {
- try {
- Topic.validate(validTopicNames(i))
- }
- catch {
- case e: Exception => fail("Should not throw exception.")
- }
- }
- }
-}