You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/23 21:06:33 UTC
git commit: Add ReplicaFetcherThread name to mbean names; kafka-726;
patched by Swapnil Ghike; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 22cd8bfd6 -> 307b1f338
Add ReplicaFetcherThread name to mbean names; kafka-726; patched by Swapnil Ghike; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/307b1f33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/307b1f33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/307b1f33
Branch: refs/heads/0.8
Commit: 307b1f338ad9748809c9b782dc3f28263edcadf2
Parents: 22cd8bf
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 23 12:06:20 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 23 12:06:20 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchRequest.scala | 1 -
.../scala/kafka/server/AbstractFetcherThread.scala | 24 +++-----------
.../scala/kafka/server/ReplicaFetcherManager.scala | 2 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
4 files changed, 7 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 7968747..1bfabb0 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -33,7 +33,6 @@ object FetchRequest {
val CurrentVersion = 0.shortValue
val DefaultMaxWait = 0
val DefaultMinBytes = 0
- val ReplicaFetcherClientId = "replica-fetcher"
val DefaultCorrelationId = 0
def readFrom(buffer: ByteBuffer): FetchRequest = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 3cba743..0b286f0 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -45,7 +45,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
val fetcherStats = new FetcherStats(metricId)
- val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
val fetcherLagStats = new FetcherLagStats(metricId)
/* callbacks to be defined in subclass */
@@ -100,7 +99,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
}
}
- fetcherMetrics.requestRate.mark()
+ fetcherStats.requestRate.mark()
if (response != null) {
// process fetched data
@@ -121,7 +120,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
partitionMap.put(topicAndPartition, newOffset)
fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
- fetcherMetrics.byteRate.mark(validBytes)
+ fetcherStats.byteRate.mark(validBytes)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentOffset.get, partitionData)
case ErrorMapping.OffsetOutOfRangeCode =>
@@ -219,22 +218,9 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
}
}
-class FetcherMetrics(metricId: ClientIdBrokerTopic) extends KafkaMetricsGroup {
- val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
- val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
-}
-
-class FetcherStats(metricId: ClientIdAndBroker) {
- private val valueFactory = (k: ClientIdBrokerTopic) => new FetcherMetrics(k)
- private val stats = new Pool[ClientIdBrokerTopic, FetcherMetrics](Some(valueFactory))
-
- def getFetcherStats(name: String): FetcherMetrics = {
- stats.getAndMaybePut(new ClientIdBrokerTopic(metricId.clientId, metricId.brokerInfo, name))
- }
-}
-
-case class ClientIdBrokerTopic(clientId: String, brokerInfo: String, topic: String) {
- override def toString = "%s-%s-%s".format(clientId, brokerInfo, topic)
+class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+ val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
+ val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
}
case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 9f696dd..7f775ec 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r
extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
- new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+ new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
}
def shutdown() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 79b3fa3..c03f758 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -27,7 +27,7 @@ class ReplicaFetcherThread(name:String,
brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name,
- clientId = FetchRequest.ReplicaFetcherClientId,
+ clientId = name,
sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,