You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/06 00:43:43 UTC

kafka git commit: MINOR: follow-up KAFKA-2730 to use two tags for broker id and fetcher id combination

Repository: kafka
Updated Branches:
  refs/heads/trunk bf2563e2f -> 0273c4379


MINOR: follow-up KAFKA-2730 to use two tags for broker id and fetcher id combination

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #434 from guozhangwang/K2730-hotfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0273c437
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0273c437
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0273c437

Branch: refs/heads/trunk
Commit: 0273c4379f12d7c3daedc89b0838485270e16bf4
Parents: bf2563e
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Nov 5 15:49:29 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 5 15:49:29 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ReplicaFetcherManager.scala | 2 +-
 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala  | 8 +++++---
 2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0273c437/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 779876b..96c2a38 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -32,7 +32,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManage
       case Some(p) =>
         "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
     }
-    new ReplicaFetcherThread(threadName, sourceBroker, brokerConfig,
+    new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig,
       replicaMgr, metrics, time)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0273c437/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index fad0e3d..745ea2e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -40,6 +40,7 @@ import scala.collection.{JavaConverters, Map, mutable}
 import JavaConverters._
 
 class ReplicaFetcherThread(name: String,
+                           fetcherId: Int,
                            sourceBroker: BrokerEndPoint,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager,
@@ -65,8 +66,9 @@ class ReplicaFetcherThread(name: String,
 
   private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
 
-  // we need to include the full thread id composed of the broker and the thread index
-  // as the metrics tag to avoid metric name conflicts with more than one thread to the same broker
+  // we need to include both the broker id and the fetcher id
+  // as the metrics tag to avoid metric name conflicts with
+  // more than one fetcher thread to the same broker
   private val networkClient = {
     val selector = new Selector(
       NetworkReceive.UNLIMITED,
@@ -74,7 +76,7 @@ class ReplicaFetcherThread(name: String,
       metrics,
       time,
       "replica-fetcher",
-      Map("thread-id" -> name).asJava,
+      Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
       false,
       ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, brokerConfig.values)
     )