You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/23 21:06:33 UTC

git commit: Add ReplicaFetcherThread name to mbean names; kafka-726; patched by Swapnil Ghike; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 22cd8bfd6 -> 307b1f338


Add ReplicaFetcherThread name to mbean names; kafka-726; patched by Swapnil Ghike; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/307b1f33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/307b1f33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/307b1f33

Branch: refs/heads/0.8
Commit: 307b1f338ad9748809c9b782dc3f28263edcadf2
Parents: 22cd8bf
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 23 12:06:20 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 23 12:06:20 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala   |    1 -
 .../scala/kafka/server/AbstractFetcherThread.scala |   24 +++-----------
 .../scala/kafka/server/ReplicaFetcherManager.scala |    2 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    2 +-
 4 files changed, 7 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 7968747..1bfabb0 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -33,7 +33,6 @@ object FetchRequest {
   val CurrentVersion = 0.shortValue
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
-  val ReplicaFetcherClientId = "replica-fetcher"
   val DefaultCorrelationId = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 3cba743..0b286f0 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -45,7 +45,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
   private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
   val fetcherStats = new FetcherStats(metricId)
-  val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
   val fetcherLagStats = new FetcherLagStats(metricId)
 
   /* callbacks to be defined in subclass */
@@ -100,7 +99,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
           }
         }
     }
-    fetcherMetrics.requestRate.mark()
+    fetcherStats.requestRate.mark()
 
     if (response != null) {
       // process fetched data
@@ -121,7 +120,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                   }
                   partitionMap.put(topicAndPartition, newOffset)
                   fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
-                  fetcherMetrics.byteRate.mark(validBytes)
+                  fetcherStats.byteRate.mark(validBytes)
                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                 case ErrorMapping.OffsetOutOfRangeCode =>
@@ -219,22 +218,9 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
   }
 }
 
-class FetcherMetrics(metricId: ClientIdBrokerTopic) extends KafkaMetricsGroup {
-  val requestRate = newMeter(metricId + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
-}
-
-class FetcherStats(metricId: ClientIdAndBroker) {
-  private val valueFactory = (k: ClientIdBrokerTopic) => new FetcherMetrics(k)
-  private val stats = new Pool[ClientIdBrokerTopic, FetcherMetrics](Some(valueFactory))
-
-  def getFetcherStats(name: String): FetcherMetrics = {
-    stats.getAndMaybePut(new ClientIdBrokerTopic(metricId.clientId, metricId.brokerInfo, name))
-  }
-}
-
-case class ClientIdBrokerTopic(clientId: String, brokerInfo: String, topic: String) {
-  override def toString = "%s-%s-%s".format(clientId, brokerInfo, topic)
+class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+  val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
 }
 
 case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 9f696dd..7f775ec 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
   }
 
   def shutdown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/307b1f33/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 79b3fa3..c03f758 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -27,7 +27,7 @@ class ReplicaFetcherThread(name:String,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = FetchRequest.ReplicaFetcherClientId,
+                                clientId = name,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,