You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/06 00:43:43 UTC
kafka git commit: MINOR: follow-up KAFKA-2730 to use two tags for
broker id and fetcher id combination
Repository: kafka
Updated Branches:
refs/heads/trunk bf2563e2f -> 0273c4379
MINOR: follow-up KAFKA-2730 to use two tags for broker id and fetcher id combination
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Ismael Juma, Guozhang Wang
Closes #434 from guozhangwang/K2730-hotfix
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0273c437
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0273c437
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0273c437
Branch: refs/heads/trunk
Commit: 0273c4379f12d7c3daedc89b0838485270e16bf4
Parents: bf2563e
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Nov 5 15:49:29 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 5 15:49:29 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/server/ReplicaFetcherManager.scala | 2 +-
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 8 +++++---
2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0273c437/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 779876b..96c2a38 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -32,7 +32,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManage
case Some(p) =>
"%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
}
- new ReplicaFetcherThread(threadName, sourceBroker, brokerConfig,
+ new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig,
replicaMgr, metrics, time)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0273c437/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index fad0e3d..745ea2e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -40,6 +40,7 @@ import scala.collection.{JavaConverters, Map, mutable}
import JavaConverters._
class ReplicaFetcherThread(name: String,
+ fetcherId: Int,
sourceBroker: BrokerEndPoint,
brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager,
@@ -65,8 +66,9 @@ class ReplicaFetcherThread(name: String,
private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
- // we need to include the full thread id composed of the broker and the thread index
- // as the metrics tag to avoid metric name conflicts with more than one thread to the same broker
+ // we need to include both the broker id and the fetcher id
+ // as the metrics tag to avoid metric name conflicts with
+ // more than one fetcher thread to the same broker
private val networkClient = {
val selector = new Selector(
NetworkReceive.UNLIMITED,
@@ -74,7 +76,7 @@ class ReplicaFetcherThread(name: String,
metrics,
time,
"replica-fetcher",
- Map("thread-id" -> name).asJava,
+ Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
false,
ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, brokerConfig.values)
)