You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/03/02 17:55:04 UTC

kafka git commit: KAFKA4811; ReplicaFetchThread may fail to create due to existing metric

Repository: kafka
Updated Branches:
  refs/heads/trunk ef92bb4e0 -> 1b902b4ed


KAFKA4811; ReplicaFetchThread may fail to create due to existing metric

Have fetcherThreadMap keyed off brokerId + fetcherId instead of broker + fetcherId, but did not consider the case where port is changed.

Author: huxi <hu...@zhenrongbao.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #2606 from amethystic/kafka4811_ReplicaFetchThread_fail_create


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b902b4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b902b4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b902b4e

Branch: refs/heads/trunk
Commit: 1b902b4ed39e78066fab163d1b6d54dd435b1d7b
Parents: ef92bb4
Author: huxi <hu...@zhenrongbao.com>
Authored: Thu Mar 2 09:55:01 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 2 09:55:01 2017 -0800

----------------------------------------------------------------------
 .../kafka/server/AbstractFetcherManager.scala   | 29 ++++++++++++++------
 .../kafka/server/AbstractFetcherThread.scala    |  2 +-
 2 files changed, 21 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1b902b4e/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 0a17f8e..2b2aa7b 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
 abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
   extends Logging with KafkaMetricsGroup {
   // map of (source broker_id, fetcher_id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
+  private val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
@@ -75,17 +75,26 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
     mapLock synchronized {
       val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>
         BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
+
+      def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) {
+        val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
+        fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
+        fetcherThread.start
+      }
+
       for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
-        var fetcherThread: AbstractFetcherThread = null
-        fetcherThreadMap.get(brokerAndFetcherId) match {
-          case Some(f) => fetcherThread = f
+        val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
+        fetcherThreadMap.get(brokerIdAndFetcherId) match {
+          case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port =>
+            // reuse the fetcher thread
+          case Some(f) =>
+            f.shutdown()
+            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
           case None =>
-            fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
-            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
-            fetcherThread.start
+            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
         }
 
-        fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
+        fetcherThreadMap(brokerIdAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
           tp -> brokerAndInitOffset.initOffset
         })
       }
@@ -105,7 +114,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
 
   def shutdownIdleFetcherThreads() {
     mapLock synchronized {
-      val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId]
+      val keysToBeRemoved = new mutable.HashSet[BrokerIdAndFetcherId]
       for ((key, fetcher) <- fetcherThreadMap) {
         if (fetcher.partitionCount <= 0) {
           fetcher.shutdown()
@@ -133,3 +142,5 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
 case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int)
 
 case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long)
+
+case class BrokerIdAndFetcherId(brokerId: Int, fetcherId: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b902b4e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 6462968..0eb3ad8 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.record.MemoryRecords
  */
 abstract class AbstractFetcherThread(name: String,
                                      clientId: String,
-                                     sourceBroker: BrokerEndPoint,
+                                     val sourceBroker: BrokerEndPoint,
                                      fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {