You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/03/02 17:55:04 UTC
kafka git commit: KAFKA4811;
ReplicaFetchThread may fail to create due to existing metric
Repository: kafka
Updated Branches:
refs/heads/trunk ef92bb4e0 -> 1b902b4ed
KAFKA4811; ReplicaFetchThread may fail to create due to existing metric
Have fetcherThreadMap keyed off brokerId + fetcherId instead of broker + fetcherId, but did not consider the case where port is changed.
Author: huxi <hu...@zhenrongbao.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #2606 from amethystic/kafka4811_ReplicaFetchThread_fail_create
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b902b4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b902b4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b902b4e
Branch: refs/heads/trunk
Commit: 1b902b4ed39e78066fab163d1b6d54dd435b1d7b
Parents: ef92bb4
Author: huxi <hu...@zhenrongbao.com>
Authored: Thu Mar 2 09:55:01 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 2 09:55:01 2017 -0800
----------------------------------------------------------------------
.../kafka/server/AbstractFetcherManager.scala | 29 ++++++++++++++------
.../kafka/server/AbstractFetcherThread.scala | 2 +-
2 files changed, 21 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b902b4e/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 0a17f8e..2b2aa7b 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
extends Logging with KafkaMetricsGroup {
// map of (source broker_id, fetcher_id per source broker) => fetcher
- private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
+ private val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread]
private val mapLock = new Object
this.logIdent = "[" + name + "] "
@@ -75,17 +75,26 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
mapLock synchronized {
val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>
BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
+
+ def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) {
+ val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
+ fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
+ fetcherThread.start
+ }
+
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
- var fetcherThread: AbstractFetcherThread = null
- fetcherThreadMap.get(brokerAndFetcherId) match {
- case Some(f) => fetcherThread = f
+ val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
+ fetcherThreadMap.get(brokerIdAndFetcherId) match {
+ case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port =>
+ // reuse the fetcher thread
+ case Some(f) =>
+ f.shutdown()
+ addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
case None =>
- fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
- fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
- fetcherThread.start
+ addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
}
- fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
+ fetcherThreadMap(brokerIdAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
tp -> brokerAndInitOffset.initOffset
})
}
@@ -105,7 +114,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
def shutdownIdleFetcherThreads() {
mapLock synchronized {
- val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId]
+ val keysToBeRemoved = new mutable.HashSet[BrokerIdAndFetcherId]
for ((key, fetcher) <- fetcherThreadMap) {
if (fetcher.partitionCount <= 0) {
fetcher.shutdown()
@@ -133,3 +142,5 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int)
case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long)
+
+case class BrokerIdAndFetcherId(brokerId: Int, fetcherId: Int)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b902b4e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 6462968..0eb3ad8 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.record.MemoryRecords
*/
abstract class AbstractFetcherThread(name: String,
clientId: String,
- sourceBroker: BrokerEndPoint,
+ val sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {