You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC
[24/30] git commit: KAFKA-622 Create mbeans per client;
patched by Swapnil; reviewed by Neha Narkhede
KAFKA-622 Create mbeans per client; patched by Swapnil; reviewed by Neha Narkhede
git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1415021 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d7c71c09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d7c71c09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d7c71c09
Branch: refs/heads/trunk
Commit: d7c71c0949e0cbae8963ca76bc69d64bee8f10af
Parents: e556063
Author: Neha Narkhede <ne...@apache.org>
Authored: Thu Nov 29 01:31:18 2012 +0000
Committer: Neha Narkhede <ne...@apache.org>
Committed: Thu Nov 29 01:31:18 2012 +0000
----------------------------------------------------------------------
.../src/main/java/kafka/etl/KafkaETLContext.java | 2 +-
core/src/main/scala/kafka/api/FetchRequest.scala | 2 +-
core/src/main/scala/kafka/client/ClientUtils.scala | 4 +-
.../kafka/common/InvalidClientIdException.scala | 22 +++++
.../kafka/consumer/ConsumerFetcherManager.scala | 2 +-
.../scala/kafka/consumer/ConsumerIterator.scala | 7 +-
.../scala/kafka/consumer/ConsumerTopicStat.scala | 40 --------
.../scala/kafka/consumer/ConsumerTopicStats.scala | 41 ++++++++
.../main/scala/kafka/consumer/KafkaStream.scala | 5 +-
.../scala/kafka/consumer/PartitionTopicInfo.scala | 7 +-
.../main/scala/kafka/consumer/SimpleConsumer.scala | 23 +++--
.../consumer/ZookeeperConsumerConnector.scala | 21 +++--
.../scala/kafka/controller/KafkaController.scala | 2 +-
.../kafka/controller/PartitionLeaderSelector.scala | 6 +-
.../kafka/controller/PartitionStateMachine.scala | 4 +-
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../kafka/javaapi/consumer/SimpleConsumer.scala | 6 +-
.../consumer/ZookeeperConsumerConnector.scala | 1 -
core/src/main/scala/kafka/log/Log.scala | 6 +-
.../kafka/metrics/KafkaCSVMetricsReporter.scala | 1 -
.../scala/kafka/producer/BrokerPartitionInfo.scala | 6 +-
.../scala/kafka/producer/ConsoleProducer.scala | 1 -
.../scala/kafka/producer/DefaultPartitioner.scala | 1 -
core/src/main/scala/kafka/producer/Producer.scala | 73 +++++++++------
.../main/scala/kafka/producer/ProducerPool.scala | 29 ++++--
.../main/scala/kafka/producer/SyncProducer.scala | 14 +--
.../scala/kafka/producer/SyncProducerConfig.scala | 2 +-
.../kafka/producer/async/AsyncProducerStats.scala | 25 -----
.../kafka/producer/async/DefaultEventHandler.scala | 14 ++-
.../kafka/producer/async/ProducerSendThread.scala | 5 +-
core/src/main/scala/kafka/serializer/Decoder.scala | 1 -
core/src/main/scala/kafka/serializer/Encoder.scala | 2 -
.../scala/kafka/server/AbstractFetcherThread.scala | 45 +++++----
core/src/main/scala/kafka/server/KafkaApis.scala | 16 ++--
.../scala/kafka/server/KafkaRequestHandler.scala | 8 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 8 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
.../scala/kafka/tools/ConsumerOffsetChecker.scala | 2 +-
.../main/scala/kafka/tools/GetOffsetShell.scala | 2 +-
core/src/main/scala/kafka/tools/MirrorMaker.scala | 1 -
.../main/scala/kafka/tools/ReplayLogProducer.scala | 2 +-
.../scala/kafka/tools/SimpleConsumerShell.scala | 8 +-
.../main/scala/kafka/tools/UpdateOffsetsInZK.scala | 2 +-
.../main/scala/kafka/utils/ClientIdAndTopic.scala | 64 +++++++++++++
core/src/main/scala/kafka/utils/Topic.scala | 41 --------
.../test/scala/other/kafka/TestKafkaAppender.scala | 1 -
.../scala/other/kafka/TestZKConsumerOffsets.scala | 1 -
.../unit/kafka/consumer/ConsumerIteratorTest.scala | 6 +-
.../kafka/integration/AutoOffsetResetTest.scala | 1 -
.../scala/unit/kafka/integration/FetcherTest.scala | 4 +-
.../kafka/integration/LazyInitProducerTest.scala | 2 +-
.../unit/kafka/integration/PrimitiveApiTest.scala | 1 -
.../integration/ProducerConsumerTestHarness.scala | 6 +-
.../consumer/ZookeeperConsumerConnectorTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogOffsetTest.scala | 2 +-
.../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 3 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 65 ++++++++-----
.../scala/unit/kafka/producer/ProducerTest.scala | 5 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 2 -
.../unit/kafka/server/ServerShutdownTest.scala | 7 +-
.../test/scala/unit/kafka/utils/ClientIdTest.scala | 61 ++++++++++++
.../java/kafka/examples/SimpleConsumerDemo.java | 3 +-
.../kafka/perf/SimpleConsumerPerformance.scala | 2 +-
63 files changed, 441 insertions(+), 311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
index b0e75bc..8e98efc 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -109,7 +109,7 @@ public class KafkaETLContext {
// read data from queue
URI uri = _request.getURI();
- _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize);
+ _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize, "KafkaETLContext");
// get available offset range
_offsetRange = getOffsetRange();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 9892fb3..4ed071a 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -33,7 +33,7 @@ object FetchRequest {
val CurrentVersion = 1.shortValue()
val DefaultMaxWait = 0
val DefaultMinBytes = 0
- val ReplicaFetcherClientId = "replica fetcher"
+ val ReplicaFetcherClientId = "replica-fetcher"
val DefaultCorrelationId = 0
def readFrom(buffer: ByteBuffer): FetchRequest = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index aeead2d..e6e7200 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -12,14 +12,14 @@ import kafka.utils.{Utils, Logging}
*/
object ClientUtils extends Logging{
- def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = {
+ def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
- val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
+ val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
info("Fetching metadata for topic %s".format(topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/common/InvalidClientIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InvalidClientIdException.scala b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
new file mode 100644
index 0000000..edf072d
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InvalidClientIdException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class InvalidClientIdException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index bdc020b..a80fac9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -54,7 +54,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try {
trace("Partitions without leader %s".format(noLeaderPartitionSet))
val brokers = getAllBrokersInCluster(zkClient)
- val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
topicsMetadata.foreach(
tmd => {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index fd80104..c5062fc 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -34,7 +34,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
- val enableShallowIterator: Boolean)
+ val enableShallowIterator: Boolean,
+ val consumerTopicStats: ConsumerTopicStats)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@@ -48,8 +49,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
- ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
- ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
+ consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
+ consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
index 2c9c204..e69de29 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.consumer
-
-import kafka.utils.{Pool, threadsafe, Logging}
-import java.util.concurrent.TimeUnit
-import kafka.metrics.KafkaMetricsGroup
-
-@threadsafe
-class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
- val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
-}
-
-object ConsumerTopicStat extends Logging {
- private val valueFactory = (k: String) => new ConsumerTopicStat(k)
- private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
- private val allTopicStat = new ConsumerTopicStat("AllTopics")
-
- def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
-
- def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
- stats.getAndMaybePut(topic + "-")
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
new file mode 100644
index 0000000..2a9d9fb
--- /dev/null
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
+
+@threadsafe
+class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+ val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
+ val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+}
+
+class ConsumerTopicStats(clientId: String) extends Logging {
+ private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
+ private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
+ private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
+
+ def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
+
+ def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
+ stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index 115d41a..569f6df 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -26,11 +26,12 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
- val enableShallowIterator: Boolean)
+ val enableShallowIterator: Boolean,
+ val consumerTopicStats: ConsumerTopicStats)
extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
private val iter: ConsumerIterator[K,V] =
- new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)
+ new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats)
/**
* Create an iterator over messages in the stream.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 5249ddc..8c42d11 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -28,7 +28,8 @@ class PartitionTopicInfo(val topic: String,
private val chunkQueue: BlockingQueue[FetchedDataChunk],
private val consumedOffset: AtomicLong,
private val fetchedOffset: AtomicLong,
- private val fetchSize: AtomicInteger) extends Logging {
+ private val fetchSize: AtomicInteger,
+ private val consumerTopicStats: ConsumerTopicStats) extends Logging {
debug("initial consumer offset of " + this + " is " + consumedOffset.get)
debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
@@ -58,8 +59,8 @@ class PartitionTopicInfo(val topic: String,
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
fetchedOffset.set(next)
debug("updated fetch offset of (%s) to %d".format(this, next))
- ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
- ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
+ consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)
+ consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index d642a67..e42923a 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -31,12 +31,12 @@ import kafka.cluster.Broker
object SimpleConsumer extends Logging {
def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
- isFromOrdinaryConsumer: Boolean): Long = {
+ clientId: String, isFromOrdinaryConsumer: Boolean): Long = {
var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L
try {
simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
- ConsumerConfig.SocketBufferSize)
+ ConsumerConfig.SocketBufferSize, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = if(isFromOrdinaryConsumer)
new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
@@ -56,14 +56,14 @@ object SimpleConsumer extends Logging {
}
def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
- earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
+ earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = {
val cluster = getCluster(zkClient)
val broker = cluster.getBroker(brokerId) match {
case Some(b) => b
case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
"getOffsetsBefore request")
}
- earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, isFromOrdinaryConsumer)
+ earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, clientId, isFromOrdinaryConsumer)
}
}
@@ -75,10 +75,13 @@ object SimpleConsumer extends Logging {
class SimpleConsumer(val host: String,
val port: Int,
val soTimeout: Int,
- val bufferSize: Int) extends Logging {
+ val bufferSize: Int,
+ val clientId: String) extends Logging {
+ ClientId.validate(clientId)
private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
+ private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port)
private def connect(): BlockingChannel = {
close
@@ -143,12 +146,12 @@ class SimpleConsumer(val host: String,
*/
def fetch(request: FetchRequest): FetchResponse = {
var response: Receive = null
- FetchRequestAndResponseStat.requestTimer.time {
+ fetchRequestAndResponseStats.requestTimer.time {
response = sendRequest(request)
}
val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes
- FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
+ fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize)
fetchResponse
}
@@ -166,7 +169,7 @@ class SimpleConsumer(val host: String,
}
}
-object FetchRequestAndResponseStat extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val respondSizeHist = newHistogram("FetchResponseSize")
+class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
+ val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 227c90d..fbb82a2 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -35,7 +35,6 @@ import kafka.client.ClientUtils
import com.yammer.metrics.core.Gauge
import kafka.api.OffsetRequest
import kafka.metrics._
-import kafka.producer.ProducerConfig
/**
@@ -80,6 +79,8 @@ private[kafka] object ZookeeperConsumerConnector {
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
extends ConsumerConnector with Logging with KafkaMetricsGroup {
+
+ ClientId.validate(config.clientId)
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None
@@ -94,6 +95,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+ private val consumerTopicStats = new ConsumerTopicStats(config.clientId)
+
val consumerIdString = {
var consumerUuid : String = null
config.consumerId match {
@@ -195,7 +198,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val stream = new KafkaStream[K,V](
- queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator)
+ queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats)
(queue, stream)
})
).flatten.toList
@@ -399,7 +402,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val brokers = getAllBrokersInCluster(zkClient)
- val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
topicsMetadata.foreach(m =>{
@@ -595,9 +598,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case None =>
config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
- SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime)
+ SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
case OffsetRequest.LargestTimeString =>
- SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime)
+ SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
case _ =>
throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
}
@@ -611,7 +614,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
queue,
consumedOffset,
fetchedOffset,
- new AtomicInteger(config.fetchSize))
+ new AtomicInteger(config.fetchSize),
+ consumerTopicStats)
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
}
@@ -667,7 +671,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val q = e._2._1
topicThreadIdAndQueues.put(topicThreadId, q)
newGauge(
- config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+ config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
new Gauge[Int] {
def getValue = q.size
}
@@ -714,7 +718,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
config.consumerTimeoutMs,
keyDecoder,
valueDecoder,
- config.enableShallowIterator)
+ config.enableShallowIterator,
+ consumerTopicStats)
(queue, stream)
}).toList
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3b1a53f..4840c0c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -961,7 +961,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
-object ControllerStat extends KafkaMetricsGroup {
+object ControllerStats extends KafkaMetricsGroup {
val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS)
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3e5435e..3eb23cd 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -58,12 +58,12 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
.format(liveAssignedReplicasToThisPartition.mkString(",")))
liveAssignedReplicasToThisPartition.isEmpty match {
case true =>
- ControllerStat.offlinePartitionRate.mark()
+ ControllerStats.offlinePartitionRate.mark()
throw new PartitionOfflineException(("No replica for partition " +
"([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
- ControllerStat.uncleanLeaderElectionRate.mark()
+ ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicasToThisPartition.head
warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
"There's potential data loss")
@@ -78,7 +78,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
partition))
(newLeaderAndIsr, liveAssignedReplicasToThisPartition)
case None =>
- ControllerStat.offlinePartitionRate.mark()
+ ControllerStats.offlinePartitionRate.mark()
throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
"replicas assigned to it")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 17dfbee..0278782 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -223,7 +223,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match {
case 0 =>
- ControllerStat.offlinePartitionRate.mark()
+ ControllerStats.offlinePartitionRate.mark()
throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " +
"[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition,
replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
@@ -249,7 +249,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// read the controller epoch
val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition).get
- ControllerStat.offlinePartitionRate.mark()
+ ControllerStats.offlinePartitionRate.mark()
throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
.format(topicAndPartition) + " since Leader and isr path already exists with value " +
"%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5c8aec5..1753947 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -227,7 +227,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
- ControllerStat.leaderElectionTimer.time {
+ ControllerStats.leaderElectionTimer.time {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
if(!isShuttingDown.get()) {
controllerContext.controllerLock synchronized {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
index 803ec4b..58c7081 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
@@ -29,8 +29,10 @@ import kafka.javaapi.OffsetRequest
class SimpleConsumer(val host: String,
val port: Int,
val soTimeout: Int,
- val bufferSize: Int) {
- private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
+ val bufferSize: Int,
+ val clientId: String) {
+
+ private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId)
/**
* Fetch a set of messages from a topic. This version of the fetch method
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 2b20aa4..14c4c8a 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -16,7 +16,6 @@
*/
package kafka.javaapi.consumer
-import kafka.message.Message
import kafka.serializer._
import kafka.consumer._
import scala.collection.JavaConversions.asList
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4cb2445..c2fccec 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic._
import kafka.utils._
import scala.math._
import java.text.NumberFormat
-import kafka.server.BrokerTopicStat
+import kafka.server.BrokerTopicStats
import kafka.message._
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
@@ -244,8 +244,8 @@ private[kafka] class Log(val dir: File,
if(messageSetInfo.count == 0) {
(-1L, -1L)
} else {
- BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count)
- BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count)
+ BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
+ BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index fda0b24..ea9559f 100644
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -24,7 +24,6 @@ import com.yammer.metrics.Metrics
import java.io.File
import com.yammer.metrics.reporting.CsvReporter
import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.{Utils, VerifiableProperties, Logging}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 27bae5e..938504a 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -68,11 +68,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
/**
* It updates the cache by issuing a get topic metadata request to a random broker.
- * @param topic the topic for which the metadata is to be fetched
+ * @param topics the topics for which the metadata is to be fetched
*/
- def updateInfo(topics: Set[String]) = {
+ def updateInfo(topics: Set[String]) {
var topicsMetadata: Seq[TopicMetadata] = Nil
- val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers)
+ val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
topicsMetadata = topicMetadataResponse.topicsMetadata
// throw partition specific exception
topicsMetadata.foreach(tmd =>{
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index e7f50e4..8c32115 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -20,7 +20,6 @@ package kafka.producer
import scala.collection.JavaConversions._
import joptsimple._
import java.util.Properties
-import java.util.regex._
import java.io._
import kafka.common._
import kafka.message._
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index d0a89eb..c82670e 100644
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -17,7 +17,6 @@
package kafka.producer
-import kafka.utils.Utils
import kafka.utils._
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 7b8926c..3bfd563 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -16,7 +16,7 @@
*/
package kafka.producer
-import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler}
+import async.{DefaultEventHandler, ProducerSendThread, EventHandler}
import kafka.utils._
import java.util.Random
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
@@ -27,8 +27,11 @@ import kafka.metrics._
class Producer[K,V](config: ProducerConfig,
- private val eventHandler: EventHandler[K,V]) // for testing only
-extends Logging {
+ private val eventHandler: EventHandler[K,V],
+ private val producerStats: ProducerStats,
+ private val producerTopicStats: ProducerTopicStats) // only for unit testing
+ extends Logging {
+
private val hasShutdown = new AtomicBoolean(false)
if (config.batchSize > config.queueSize)
throw new InvalidConfigException("Batch size can't be larger than queue size.")
@@ -47,25 +50,38 @@ extends Logging {
queue,
eventHandler,
config.queueTime,
- config.batchSize)
+ config.batchSize,
+ config.clientId)
producerSendThread.start()
case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
}
KafkaMetricsReporter.startReporters(config.props)
+ def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) =
+ this(t._1, t._2, t._3, t._4)
+
def this(config: ProducerConfig) =
- this(config,
- new DefaultEventHandler[K,V](config,
- Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
- Utils.createObject[Encoder[V]](config.serializerClass, config.props),
- Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
- new ProducerPool(config)))
+ this {
+ ClientId.validate(config.clientId)
+ val producerStats = new ProducerStats(config.clientId)
+ val producerTopicStats = new ProducerTopicStats(config.clientId)
+ (config,
+ new DefaultEventHandler[K,V](config,
+ Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+ Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+ Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
+ new ProducerPool(config),
+ producerStats = producerStats,
+ producerTopicStats = producerTopicStats),
+ producerStats,
+ producerTopicStats)
+ }
/**
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
- * @param producerData the producer data object that encapsulates the topic, key and message data
+ * @param messages the producer data object that encapsulates the topic, key and message data
*/
def send(messages: KeyedMessage[K,V]*) {
if (hasShutdown.get)
@@ -79,8 +95,8 @@ extends Logging {
private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
- ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
- ProducerTopicStat.getProducerAllTopicStat.messageRate.mark()
+ producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
+ producerTopicStats.getProducerAllTopicStats.messageRate.mark()
}
}
@@ -105,7 +121,7 @@ extends Logging {
}
}
if(!added) {
- AsyncProducerStats.droppedMessageRate.mark()
+ producerStats.droppedMessageRate.mark()
error("Event queue is full of unsent messages, could not send event: " + message.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else {
@@ -131,26 +147,27 @@ extends Logging {
}
@threadsafe
-class ProducerTopicStat(name: String) extends KafkaMetricsGroup {
- val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
+class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+ val messageRate = newMeter(clientIdTopic + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
+ val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
}
-object ProducerTopicStat {
- private val valueFactory = (k: String) => new ProducerTopicStat(k)
- private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory))
- private val allTopicStat = new ProducerTopicStat("AllTopics")
+class ProducerTopicStats(clientId: String) {
+ private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
+ private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
+ private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
- def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat
+ def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
- def getProducerTopicStat(topic: String): ProducerTopicStat = {
- stats.getAndMaybePut(topic + "-")
+ def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
+ stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
}
}
-object ProducerStats extends KafkaMetricsGroup {
- val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
- val resendRate = newMeter( "ResendsPerSec", "resends", TimeUnit.SECONDS)
- val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
+class ProducerStats(clientId: String) extends KafkaMetricsGroup {
+ val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
+ val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS)
+ val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
+ val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index eb8ead3..7e78c7e 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -26,13 +26,26 @@ import kafka.api.TopicMetadata
import kafka.common.UnavailableProducerException
-object ProducerPool{
- def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer = {
+object ProducerPool {
+ /**
+ * Used in ProducerPool to initiate a SyncProducer connection with a broker.
+ */
+ def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
+ val props = new Properties()
+ props.put("host", broker.host)
+ props.put("port", broker.port.toString)
+ props.putAll(config.props.props)
+ new SyncProducer(new SyncProducerConfig(props))
+ }
+
+ /**
+ * Used in ClientUtils to send TopicMetadataRequest to a broker.
+ */
+ def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
- if(configOpt.isDefined)
- props.putAll(configOpt.get.props.props)
+ props.put("client.id", clientId)
new SyncProducer(new SyncProducerConfig(props))
}
}
@@ -41,9 +54,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()
- def updateProducer(topicMetaDatas: Seq[TopicMetadata]) {
+ def updateProducer(topicMetadatas: Seq[TopicMetadata]) {
val newBrokers = new collection.mutable.HashSet[Broker]
- topicMetaDatas.foreach(tmd => {
+ topicMetadatas.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => {
if(pmd.leader.isDefined)
newBrokers+=(pmd.leader.get)
@@ -53,9 +66,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
newBrokers.foreach(b => {
if(syncProducers.contains(b.id)){
syncProducers(b.id).close()
- syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+ syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
} else
- syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+ syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
})
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 15733bb..ea03d51 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -34,14 +34,12 @@ object SyncProducer {
*/
@threadsafe
class SyncProducer(val config: SyncProducerConfig) extends Logging {
-
- private val MaxConnectBackoffMs = 60000
- private var sentOnConnection = 0
private val lock = new Object()
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.bufferSize, config.requestTimeoutMs)
+ val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port)
trace("Instantiating Scala Sync Producer")
@@ -89,9 +87,9 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
* Send a message
*/
def send(producerRequest: ProducerRequest): ProducerResponse = {
- ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
+ producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes)
var response: Receive = null
- ProducerRequestStat.requestTimer.time {
+ producerRequestStats.requestTimer.time {
response = doSend(producerRequest)
}
ProducerResponse.readFrom(response.buffer)
@@ -152,7 +150,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
}
}
-object ProducerRequestStat extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val requestSizeHist = newHistogram("ProducerRequestSize")
+class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
+ val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 6f6a3f3..3e3dc49 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -44,7 +44,7 @@ trait SyncProducerConfigShared {
val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
/* the client application sending the producer requests */
- val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId)
+ val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
/*
* The required acks of the producer requests - negative value means ack
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
index dd9078e..e69de29 100644
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer.async
-
-import kafka.metrics.KafkaMetricsGroup
-import java.util.concurrent.TimeUnit
-
-object AsyncProducerStats extends KafkaMetricsGroup {
- val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 9be87d0..645402e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -33,7 +33,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val encoder: Encoder[V],
private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool,
- private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
+ private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
+ private val producerStats: ProducerStats,
+ private val producerTopicStats: ProducerTopicStats)
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
@@ -48,8 +50,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
serializedData.foreach{
keyed =>
val dataSize = keyed.message.payloadSize
- ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
- ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+ producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
+ producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1
@@ -61,11 +63,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
remainingRetries -= 1
- ProducerStats.resendRate.mark()
+ producerStats.resendRate.mark()
}
}
if(outstandingProduceRequests.size > 0) {
- ProducerStats.failedSendRate.mark()
+ producerStats.failedSendRate.mark()
error("Failed to send the following requests: " + outstandingProduceRequests)
throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
}
@@ -111,7 +113,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
} catch {
case t =>
- ProducerStats.serializationErrorRate.mark()
+ producerStats.serializationErrorRate.mark()
if (isSync) {
throw t
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index c900c45..2b39cab 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -28,12 +28,13 @@ class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[KeyedMessage[K,V]],
val handler: EventHandler[K,V],
val queueTime: Long,
- val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
+ val batchSize: Int,
+ val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
- newGauge("ProducerQueueSize-" + getId,
+ newGauge(clientId + "-ProducerQueueSize-" + getId,
new Gauge[Int] {
def getValue = queue.size
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/serializer/Decoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala
index 6fc3b1d..54d0b93 100644
--- a/core/src/main/scala/kafka/serializer/Decoder.scala
+++ b/core/src/main/scala/kafka/serializer/Decoder.scala
@@ -17,7 +17,6 @@
package kafka.serializer
-import kafka.message._
import kafka.utils.VerifiableProperties
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/serializer/Encoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/serializer/Encoder.scala b/core/src/main/scala/kafka/serializer/Encoder.scala
index fa9067f..020e73c 100644
--- a/core/src/main/scala/kafka/serializer/Encoder.scala
+++ b/core/src/main/scala/kafka/serializer/Encoder.scala
@@ -18,8 +18,6 @@
package kafka.serializer
import kafka.utils.VerifiableProperties
-import kafka.message._
-import kafka.utils.Utils
/**
* An encoder is a method of turning objects into byte arrays.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index c2cc3cb..e4520a4 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -27,7 +27,7 @@ import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{Pool, ShutdownableThread}
+import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
@@ -38,12 +38,13 @@ import java.util.concurrent.locks.ReentrantLock
abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
extends ShutdownableThread(name) {
-
private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()
- val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
- val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
+ val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
+ val fetcherStats = new FetcherStats(clientId)
+ val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
+ val fetcherLagStats = new FetcherLagStats(clientId)
/* callbacks to be defined in subclass */
@@ -117,7 +118,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
case None => currentOffset.get
}
partitionMap.put(topicAndPartition, newOffset)
- FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
+ fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
fetcherMetrics.byteRate.mark(validBytes)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentOffset.get, partitionData)
@@ -182,10 +183,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
}
-class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
+class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup {
private[this] var lagVal = new AtomicLong(-1L)
newGauge(
- name._1 + "-" + name._2 + "-ConsumerLag",
+ clientIdTopicPartition + "-ConsumerLag",
new Gauge[Long] {
def getValue = lagVal.get
}
@@ -198,25 +199,29 @@ class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
def lag = lagVal.get
}
-object FetcherLagMetrics {
- private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k)
- private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory))
+class FetcherLagStats(clientId: String) {
+ private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
+ private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
- def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = {
- stats.getAndMaybePut( (topic, partitionId) )
+ def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
+ stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId))
}
}
-class FetcherStat(name: String) extends KafkaMetricsGroup {
- val requestRate = newMeter(name + "RequestsPerSec", "requests", TimeUnit.SECONDS)
- val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
+class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+ val requestRate = newMeter(clientIdTopic + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
+ val byteRate = newMeter(clientIdTopic + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
}
-object FetcherStat {
- private val valueFactory = (k: String) => new FetcherStat(k)
- private val stats = new Pool[String, FetcherStat](Some(valueFactory))
+class FetcherStats(clientId: String) {
+ private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k)
+ private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory))
- def getFetcherStat(name: String): FetcherStat = {
- stats.getAndMaybePut(name)
+ def getFetcherStats(name: String): FetcherMetrics = {
+ stats.getAndMaybePut(new ClientIdAndTopic(clientId, name))
}
}
+
+case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
+ override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a14e0a2..cc04ed5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -237,8 +237,8 @@ class KafkaApis(val requestChannel: RequestChannel,
private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = {
trace("Append [%s] to local log ".format(partitionAndData.toString))
partitionAndData.map {case (topicAndPartition, messages) =>
- BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
- BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicStats.bytesInRate.mark(messages.sizeInBytes)
try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -255,8 +255,8 @@ class KafkaApis(val requestChannel: RequestChannel,
Runtime.getRuntime.halt(1)
null
case e =>
- BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).failedProduceRequestRate.mark()
- BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
+ BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e)
new ProduceResult(topicAndPartition, e)
}
@@ -323,8 +323,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionData =
try {
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
- BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
- BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
if (!isFetchFromFollower) {
new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
} else {
@@ -334,8 +334,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} catch {
case t: Throwable =>
- BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
- BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
+ BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+ BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), t)
new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
offset, -1L, MessageSet.Empty)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d949652..f0c05a5 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -79,14 +79,14 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS)
}
-object BrokerTopicStat extends Logging {
+object BrokerTopicStats extends Logging {
private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
- private val allTopicStat = new BrokerTopicMetrics("AllTopics")
+ private val allTopicStats = new BrokerTopicMetrics("AllTopics")
- def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat
+ def getBrokerAllTopicStats(): BrokerTopicMetrics = allTopicStats
- def getBrokerTopicStat(topic: String): BrokerTopicMetrics = {
+ def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
stats.getAndMaybePut(topic + "-")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e0a86b9..d444d22 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -23,7 +23,7 @@ import kafka.utils._
import java.util.concurrent._
import atomic.AtomicBoolean
import org.I0Itec.zkclient.ZkClient
-import kafka.controller.{ControllerStat, KafkaController}
+import kafka.controller.{ControllerStats, KafkaController}
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -96,9 +96,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
* Forces some dynamic jmx beans to be registered on server startup.
*/
private def registerStats() {
- BrokerTopicStat.getBrokerAllTopicStat()
- ControllerStat.offlinePartitionRate
- ControllerStat.uncleanLeaderElectionRate
+ BrokerTopicStats.getBrokerAllTopicStats()
+ ControllerStats.offlinePartitionRate
+ ControllerStats.uncleanLeaderElectionRate
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 40afcab..34166e4 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -28,7 +28,7 @@ class ReplicaFetcherThread(name:String,
brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name,
- clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) ,
+ clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketBufferSize,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 9ca4dc8..db9acc9 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -41,7 +41,7 @@ object ConsumerOffsetChecker extends Logging {
val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
val consumer = brokerInfo match {
case BrokerIpPattern(ip, port) =>
- Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
+ Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
case _ =>
error("Could not parse broker info %s".format(brokerInfo))
None
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index e78d53d..2b9438a 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -67,7 +67,7 @@ object GetOffsetShell {
val partition = options.valueOf(partitionOpt).intValue
var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
- val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)
+ val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index aa23559..5c4b3d2 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,7 +17,6 @@
package kafka.tools
-import kafka.message.Message
import joptsimple.OptionParser
import kafka.utils.{Utils, CommandLineUtils, Logging}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 79ffcc5..db14c82 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -24,7 +24,7 @@ import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.consumer._
import kafka.utils.{Logging, ZkUtils}
import kafka.api.OffsetRequest
-import kafka.message.{CompressionCodec, Message}
+import kafka.message.CompressionCodec
object ReplayLogProducer extends Logging {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index e34a432..addd8db 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -19,12 +19,10 @@ package kafka.tools
import joptsimple._
import kafka.utils._
-import kafka.producer.ProducerConfig
import kafka.consumer._
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
-import java.util.Properties
import scala.collection.JavaConversions._
/**
@@ -127,7 +125,7 @@ object SimpleConsumerShell extends Logging {
// getting topic metadata
info("Getting topic metatdata...")
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
- val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1)
@@ -167,7 +165,7 @@ object SimpleConsumerShell extends Logging {
System.exit(1)
}
if(startingOffset < 0)
- startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, false)
+ startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, clientId, false)
// initializing formatter
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
@@ -175,7 +173,7 @@ object SimpleConsumerShell extends Logging {
info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
.format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
- val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024)
+ val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var offset = startingOffset
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index fed7aad..111c9a8 100644
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -65,7 +65,7 @@ object UpdateOffsetsInZK {
ZkUtils.getBrokerInfo(zkClient, broker) match {
case Some(brokerInfo) =>
- val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+ val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
new file mode 100644
index 0000000..780339e
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import kafka.common.InvalidClientIdException
+import util.matching.Regex
+
+object ClientId {
+ val legalChars = "[a-zA-Z0-9_-]"
+ val maxNameLength = 200 // to prevent hitting filename max length limit
+ private val rgx = new Regex(legalChars + "*")
+
+ def validate(clientId: String) {
+ if (clientId.length > maxNameLength)
+ throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
+
+ rgx.findFirstIn(clientId) match {
+ case Some(t) =>
+ if (!t.equals(clientId))
+ throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+ case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+ }
+ }
+}
+
+object Topic {
+ val legalChars = "[a-zA-Z0-9_-]"
+ val maxNameLength = 255
+ private val rgx = new Regex(legalChars + "+")
+
+ def validate(topic: String) {
+ if (topic.length <= 0)
+ throw new InvalidTopicException("topic name is illegal, can't be empty")
+ else if (topic.length > maxNameLength)
+ throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
+
+ rgx.findFirstIn(topic) match {
+ case Some(t) =>
+ if (!t.equals(topic))
+ throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+ case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+ }
+ }
+}
+
+case class ClientIdAndTopic(clientId: String, topic:String) {
+ override def toString = "%s-%s".format(clientId, topic)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/main/scala/kafka/utils/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Topic.scala b/core/src/main/scala/kafka/utils/Topic.scala
index fe79adf..e69de29 100644
--- a/core/src/main/scala/kafka/utils/Topic.scala
+++ b/core/src/main/scala/kafka/utils/Topic.scala
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.common.InvalidTopicException
-import util.matching.Regex
-
-object Topic {
- val legalChars = "[a-zA-Z0-9_-]"
- val maxNameLength = 255
- private val rgx = new Regex(legalChars + "+")
-
- def validate(topic: String) {
- if (topic.length <= 0)
- throw new InvalidTopicException("topic name is illegal, can't be empty")
- else if (topic.length > maxNameLength)
- throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
-
- rgx.findFirstIn(topic) match {
- case Some(t) =>
- if (!t.equals(topic))
- throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
- case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/other/kafka/TestKafkaAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
index bd09d78..ab807a1 100644
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
@@ -17,7 +17,6 @@
package kafka
-import message.Message
import org.apache.log4j.PropertyConfigurator
import kafka.utils.Logging
import serializer.Encoder
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
index 7d48458..5b72eed 100644
--- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
@@ -18,7 +18,6 @@
package kafka
import consumer._
-import message.Message
import utils.Utils
import java.util.concurrent.CountDownLatch