You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/05 16:36:29 UTC

[GitHub] [kafka] rondagostino opened a new pull request #10069: MINOR: Add RaftReplicaManager

rondagostino opened a new pull request #10069:
URL: https://github.com/apache/kafka/pull/10069


   This adds the logic to apply partition metadata when consuming from the Raft-based metadata log.
   
   `RaftReplicaManager` extends `ReplicaManager` for now to minimize changes to existing code for the 2.8 release.  We will likely adjust this hierarchy at a later time (e.g. introducing a trait and adding a helper to refactor common code).  For now, we expose the necessary fields and methods in `ReplicaManager` by changing their scope from `private` to `protected`, and we refactor out a couple of pieces of logic that are shared between the two implementation (stopping replicas and adding log dir fetchers).
   
   Existing tests are sufficient to expose regressions in the current `ReplicaManager`.
   
   We intend to exercise the new `RaftReplicaManager` code via system tests and unit/integration tests (both to come in later PRs).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#issuecomment-775521260


   Thanks for this PR, @rondagostino !
   
   I can see why you wanted to have `RaftReplicaChangeDelegateHelper`.  The `ReplicaManager` is not very easy to unit test because it has grown so large.  I don't think this delegate thing is quite the right abstraction here-- it's pretty confusing-- but I guess let's revisit this after 2.8 is finished.
   
   I suppose one option is, once `ReplicaManager` is a pure interface, we can split the kip-500 update logic off into a separate set of functions that takes a `ReplicaManager` as an input.  Then we can easily unit-test the update logic with a `MockReplicaManager`.
   
   For now I left two small comments... LGTM after those are addressed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573328630



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -185,7 +185,9 @@ object HostedPartition {
    * This state only applies to brokers that are using a Raft-based metadata
    * quorum; it never happens when using ZooKeeper.
    */
-  final case class Deferred(partition: Partition) extends NonOffline
+  final case class Deferred(partition: Partition,
+                            isNew: Boolean,
+                            onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit) extends NonOffline

Review comment:
       I don't think this callback is logically part of the HostedPartition object itself.  We don't want a different callback for each partition.  It will the same one for each (in practice it just hooks into the group manager and one other manager that I forget).  So let's just supply this callback in whatever function it is needed in, not store it in the hostedpartition object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572433038



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._
 
 
 object MetadataPartition {
-  def apply(name: String, record: PartitionRecord): MetadataPartition = {
+  val OffsetNeverDeferred = 0L // must not be a valid offset we could see (i.e. must not be positive)

Review comment:
       Can you add JavaDoc for this?
   
   Also, what about `NoDeferredOffset` as a name?
   
   One last question... why is this 0 and not -1?  0 is a valid offset in the log, whereas -1 is not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573910513



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: RaftMetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  def this(config: KafkaConfig,
+           metrics: Metrics,
+           time: Time,
+           scheduler: Scheduler,
+           logManager: LogManager,
+           isShuttingDown: AtomicBoolean,
+           quotaManagers: QuotaManagers,
+           brokerTopicStats: BrokerTopicStats,
+           metadataCache: RaftMetadataCache,
+           logDirFailureChannel: LogDirFailureChannel,
+           alterIsrManager: AlterIsrManager,
+           configRepository: ConfigRepository,
+           threadNamePrefix: Option[String] = None) = {
+    this(config, metrics, time, scheduler, logManager, isShuttingDown,
+      quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+      DelayedOperationPurgatory[DelayedProduce](
+        purgatoryName = "Produce", brokerId = config.brokerId,
+        purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedFetch](
+        purgatoryName = "Fetch", brokerId = config.brokerId,
+        purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedDeleteRecords](
+        purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+        purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedElectLeader](
+        purgatoryName = "ElectLeader", brokerId = config.brokerId),
+      threadNamePrefix, configRepository, alterIsrManager)
+  }
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager: RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+    override def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = raftReplicaManager.completeDelayedFetchOrProduceRequests(topicPartition)
+
+    override def config: KafkaConfig = raftReplicaManager.config
+
+    override def error(msg: => String, e: => Throwable): Unit = raftReplicaManager.error(msg, e)
+
+    override def getLogDir(topicPartition: TopicPartition): Option[String] = raftReplicaManager.getLogDir(topicPartition)
+
+    override def initialFetchOffset(log: Log): Long = raftReplicaManager.initialFetchOffset(log)
+
+    override def isShuttingDown: Boolean = raftReplicaManager.isShuttingDown.get
+
+    override def markDeferred(state: HostedPartition.Deferred): Unit = raftReplicaManager.markPartitionDeferred(state)
+
+    override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
+
+    override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
+
+    override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager
+
+    override def stateChangeLogger: StateChangeLogger = raftReplicaManager.stateChangeLogger
+  }
+
+  // visible/overwriteable for testing, generally will not change otherwise
+  private[server] var delegate = new RaftReplicaChangeDelegate(new RaftReplicaManagerChangeDelegateHelper(this))
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    val startMs = time.milliseconds()
+    val partitionsMadeFollower = mutable.Set[Partition]()
+    val partitionsMadeLeader = mutable.Set[Partition]()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val leadershipChangeCallbacks = {
+        mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]()
+      }
+      val metadataImage = metadataCache.currentImage()
+      val brokers = metadataImage.brokers
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+        }
+
+        val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
+          delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+        val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
+          delegate.makeFollowers(partitionsAlreadyExisting, brokers, followerPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+
+        // We need to transition anything that hasn't transitioned from Deferred to Offline to the Online state.
+        // We also need to identify the leadership change callback(s) to invoke
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          val topicPartition = partition.topicPartition
+          // identify for callback if necessary
+          if (state.leaderId == localBrokerId) {
+            if (partitionsMadeLeader.contains(partition)) {
+              leadershipChangeCallbacks.getOrElseUpdate(
+                deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._1 += partition
+            }
+          } else if (partitionsMadeFollower.contains(partition)) {
+            leadershipChangeCallbacks.getOrElseUpdate(
+              deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._2 += partition
+          }
+          // transition from Deferred to Online
+          allPartitions.put(topicPartition, HostedPartition.Online(partition))
+        }
+
+        updateLeaderAndFollowerMetrics(partitionsMadeFollower.map(_.topic).toSet)
+
+        maybeAddLogDirFetchers(partitionsMadeFollower, highWatermarkCheckpoints)
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        leadershipChangeCallbacks.forKeyValue { (onLeadershipChange, leaderAndFollowerPartitions) =>
+          onLeadershipChange(leaderAndFollowerPartitions._1, leaderAndFollowerPartitions._2)
+        }
+      } catch {
+        case e: Throwable =>
+          deferredPartitionsIterator.foreach { metadata =>
+            val partition = metadata.partition
+            val state = cachedState(metadataImage, partition)
+            val topicPartition = partition.topicPartition
+            val leader = state.leaderId == localBrokerId
+            val leaderOrFollower = if (leader) "leader" else "follower"
+            val partitionLogMsgPrefix = s"Apply deferred $leaderOrFollower partition $topicPartition"
+            stateChangeLogger.error(s"$partitionLogMsgPrefix: error while applying deferred metadata.", e)
+          }
+          stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions prior to the error: " +
+            s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)")
+          // Re-throw the exception for it to be caught in BrokerMetadataListener
+          throw e
+      }
+      deferringMetadataChanges = false
+    }
+    val endMs = time.milliseconds()
+    val elapsedMs = endMs - startMs
+    stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions: " +
+      s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)" +
+      s"in $elapsedMs ms")
+    stateChangeLogger.info("Metadata changes are no longer being deferred")
+  }
+
+  /**
+   * Handle changes made by a batch of metadata log records.
+   *
+   * @param imageBuilder       The MetadataImage builder.
+   * @param metadataOffset     The last offset in the batch of records.
+   * @param onLeadershipChange The callbacks to invoke when leadership changes.
+   */
+  def handleMetadataRecords(imageBuilder: MetadataImageBuilder,
+                            metadataOffset: Long,
+                            onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): Unit = {
+    val startMs = time.milliseconds()
+    val builder = imageBuilder.partitionsBuilder()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(("Metadata batch %d: %d local partition(s) changed, %d " +
+        "local partition(s) removed.").format(metadataOffset, builder.localChanged().size,
+        builder.localRemoved().size))
+      if (stateChangeLogger.isTraceEnabled) {
+        builder.localChanged().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally changed: ${state}")
+        }
+        builder.localRemoved().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally removed: ${state}")
+        }
+      }
+      if (deferringMetadataChanges) {
+        val prevPartitions = imageBuilder.prevImage.partitions
+        // partitionChangesToBeDeferred maps each partition to be deferred to whether it is new (i.e. existed before deferral began)
+        val partitionChangesToBeDeferred = mutable.HashMap[Partition, Boolean]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val (partition, priorDeferredMetadata) = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              (None, None)
+
+            case HostedPartition.Online(partition) => (Some(partition), None)
+            case deferred@HostedPartition.Deferred(partition, _, _) => (Some(partition), Some(deferred))
+
+            case HostedPartition.None =>
+              // Create the partition instance since it does not yet exist
+              (Some(Partition(topicPartition, time, configRepository, this)), None)
+          }
+          partition.foreach { partition =>
+            val isNew = priorDeferredMetadata match {
+              case Some(alreadyDeferred) => alreadyDeferred.isNew
+              case _ => prevPartitions.topicPartition(topicPartition.topic(), topicPartition.partition()).isEmpty
+            }
+            partitionChangesToBeDeferred.put(partition, isNew)
+          }
+        }
+
+        stateChangeLogger.info(s"Deferring metadata changes for ${partitionChangesToBeDeferred.size} partition(s)")
+        if (partitionChangesToBeDeferred.nonEmpty) {
+          delegate.makeDeferred(partitionChangesToBeDeferred, metadataOffset, onLeadershipChange)
+        }
+      } else { // not deferring changes, so make leaders/followers accordingly
+        val partitionsToBeLeader = mutable.HashMap[Partition, MetadataPartition]()
+        val partitionsToBeFollower = mutable.HashMap[Partition, MetadataPartition]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val partition = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              None
+
+            case HostedPartition.Online(partition) => Some(partition)
+            case _: HostedPartition.Deferred => throw new IllegalStateException(
+              s"There should never be deferred partition metadata when we aren't deferring changes: $topicPartition")
+
+            case HostedPartition.None =>
+              // it's a partition that we don't know about yet, so create it and mark it online
+              val partition = Partition(topicPartition, time, configRepository, this)
+              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
+              Some(partition)
+          }
+          partition.foreach { partition =>
+            if (currentState.leaderId == localBrokerId) {
+              partitionsToBeLeader.put(partition, currentState)
+            } else {
+              partitionsToBeFollower.put(partition, currentState)
+            }
+          }
+        }
+
+        val prevPartitions = imageBuilder.prevImage.partitions
+        val changedPartitionsPreviouslyExisting = mutable.Set[MetadataPartition]()
+        builder.localChanged().foreach(metadataPartition =>
+          prevPartitions.topicPartition(metadataPartition.topicName, metadataPartition.partitionIndex).foreach(
+            changedPartitionsPreviouslyExisting.add))
+        val nextBrokers = imageBuilder.nextBrokers()

Review comment:
       Thanks for the explanation!
   
   Let's just rename this to "brokers" and then I think we're good.  (Everything in the builder is "next" after all :) )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571257684



##########
File path: core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
##########
@@ -84,7 +84,7 @@ class DelayedDeleteRecords(delayMs: Long,
                 (false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
             }
 
-          case HostedPartition.Deferred(_) =>
+          case HostedPartition.Deferred(_, _, _, _, _) =>

Review comment:
       It's cleaner to use `case _: HostedPartition.Deferred`. Otherwise, every time a parameter is added, you have to update this for no good reason.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571818172



##########
File path: core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.metadata.{CachedConfigRepository, RaftMetadataCache}
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import org.apache.kafka.common.metrics.Metrics
+import org.easymock.EasyMock

Review comment:
       Can we use `Mockito` instead of `EasyMock`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571818015



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -439,71 +414,85 @@ class ReplicaManager(val config: KafkaConfig,
                 responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
               }
 
-            case HostedPartition.Deferred(_) =>
+            case _: HostedPartition.Deferred =>
               throw new IllegalStateException("We should never be deferring partition metadata changes and stopping a replica when using ZooKeeper")
 
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition -> deletePartition
               responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
-        // First stop fetchers for all partitions.
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
-
-        // Second remove deleted partitions from the partition map. Fetchers rely on the
-        // ReplicaManager to get Partition's information so they must be stopped first.
-        val deletedPartitions = mutable.Set.empty[TopicPartition]
-        stoppedPartitions.forKeyValue { (topicPartition, partitionState) =>
-          if (partitionState.deletePartition) {
-            getPartition(topicPartition) match {
-              case hostedPartition@HostedPartition.Online(partition) =>
-                if (allPartitions.remove(topicPartition, hostedPartition)) {
-                  maybeRemoveTopicMetrics(topicPartition.topic)
-                  // Logs are not deleted here. They are deleted in a single batch later on.
-                  // This is done to avoid having to checkpoint for every deletions.
-                  partition.delete()
-                }
-
-              case _ =>
-            }
-
-            deletedPartitions += topicPartition
-          }
-
-          // If we were the leader, we may have some operations still waiting for completion.
-          // We force completion to prevent them from timing out.
-          completeDelayedFetchOrProduceRequests(topicPartition)
-        }
-
-        // Third delete the logs and checkpoint.
-        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => {
-          exception match {
-            case e: KafkaStorageException =>
+        stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
               stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                 "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-
-            case e =>
-              stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
+          } else {
+            stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
                 s"${e.getClass.getName} exception: ${e.getMessage}")
               responseMap.put(topicPartition, Errors.forException(e))
           }
-        })
-
+          responseMap.put(topicPartition, Errors.forException(e))
+        }
         (responseMap, Errors.NONE)
       }
     }
   }
 
+  /**
+   * Stop the given partitions.
+   *
+   * @param partitionsToStop    A map from a topic partition to a boolean indicating
+   *                            whether the partition should be deleted.
+   *
+   * @return                    A map from partitions to exceptions which occurred.
+   *                            If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = {
+    // First stop fetchers for all partitions.
+    val partitions = partitionsToStop.keySet
+    replicaFetcherManager.removeFetcherForPartitions(partitions)
+    replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+    // Second remove deleted partitions from the partition map. Fetchers rely on the
+    // ReplicaManager to get Partition's information so they must be stopped first.
+    val partitionsToDelete = mutable.Set.empty[TopicPartition]
+    partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
+      if (shouldDelete) {
+        getPartition(topicPartition) match {
+          case hostedPartition: NonOffline => // Online or Deferred (Deferred never occurs when using ZooKeeper)
+            if (allPartitions.remove(topicPartition, hostedPartition)) {
+              maybeRemoveTopicMetrics(topicPartition.topic)
+              // Logs are not deleted here. They are deleted in a single batch later on.
+              // This is done to avoid having to checkpoint for every deletions.
+              hostedPartition.partition.delete()
+            }
+
+          case _ =>
+        }
+        partitionsToDelete += topicPartition
+      }
+      // If we were the leader, we may have some operations still waiting for completion.
+      // We force completion to prevent them from timing out.
+      completeDelayedFetchOrProduceRequests(topicPartition)
+    }
+
+    // Third delete the logs and checkpoint.
+    val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    if (partitionsToDelete.nonEmpty) {
+      // Delete the logs and checkpoint. Confusingly, this function isn't actually
+      // asynchronous-- it just synchronously schedules the directories to be deleted later.

Review comment:
       This comment is a bit confusing. It synchronously schedules and asynchronously deletes it. Why is the function not asynchronous? I think you mean that it doesn't give you a future or something like that, but that is not a requirement for a function to be asynchronous, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573855674



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: RaftMetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  def this(config: KafkaConfig,
+           metrics: Metrics,
+           time: Time,
+           scheduler: Scheduler,
+           logManager: LogManager,
+           isShuttingDown: AtomicBoolean,
+           quotaManagers: QuotaManagers,
+           brokerTopicStats: BrokerTopicStats,
+           metadataCache: RaftMetadataCache,
+           logDirFailureChannel: LogDirFailureChannel,
+           alterIsrManager: AlterIsrManager,
+           configRepository: ConfigRepository,
+           threadNamePrefix: Option[String] = None) = {
+    this(config, metrics, time, scheduler, logManager, isShuttingDown,
+      quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+      DelayedOperationPurgatory[DelayedProduce](
+        purgatoryName = "Produce", brokerId = config.brokerId,
+        purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedFetch](
+        purgatoryName = "Fetch", brokerId = config.brokerId,
+        purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedDeleteRecords](
+        purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+        purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedElectLeader](
+        purgatoryName = "ElectLeader", brokerId = config.brokerId),
+      threadNamePrefix, configRepository, alterIsrManager)
+  }
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager: RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+    override def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = raftReplicaManager.completeDelayedFetchOrProduceRequests(topicPartition)
+
+    override def config: KafkaConfig = raftReplicaManager.config
+
+    override def error(msg: => String, e: => Throwable): Unit = raftReplicaManager.error(msg, e)
+
+    override def getLogDir(topicPartition: TopicPartition): Option[String] = raftReplicaManager.getLogDir(topicPartition)
+
+    override def initialFetchOffset(log: Log): Long = raftReplicaManager.initialFetchOffset(log)
+
+    override def isShuttingDown: Boolean = raftReplicaManager.isShuttingDown.get
+
+    override def markDeferred(state: HostedPartition.Deferred): Unit = raftReplicaManager.markPartitionDeferred(state)
+
+    override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
+
+    override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
+
+    override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager
+
+    override def stateChangeLogger: StateChangeLogger = raftReplicaManager.stateChangeLogger
+  }
+
+  // visible/overwriteable for testing, generally will not change otherwise
+  private[server] var delegate = new RaftReplicaChangeDelegate(new RaftReplicaManagerChangeDelegateHelper(this))
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    val startMs = time.milliseconds()
+    val partitionsMadeFollower = mutable.Set[Partition]()
+    val partitionsMadeLeader = mutable.Set[Partition]()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val leadershipChangeCallbacks = {
+        mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]()
+      }
+      val metadataImage = metadataCache.currentImage()
+      val brokers = metadataImage.brokers
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+        }
+
+        val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
+          delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+        val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
+          delegate.makeFollowers(partitionsAlreadyExisting, brokers, followerPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+
+        // We need to transition anything that hasn't transitioned from Deferred to Offline to the Online state.
+        // We also need to identify the leadership change callback(s) to invoke
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          val topicPartition = partition.topicPartition
+          // identify for callback if necessary
+          if (state.leaderId == localBrokerId) {
+            if (partitionsMadeLeader.contains(partition)) {
+              leadershipChangeCallbacks.getOrElseUpdate(
+                deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._1 += partition
+            }
+          } else if (partitionsMadeFollower.contains(partition)) {
+            leadershipChangeCallbacks.getOrElseUpdate(
+              deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._2 += partition
+          }
+          // transition from Deferred to Online
+          allPartitions.put(topicPartition, HostedPartition.Online(partition))
+        }
+
+        updateLeaderAndFollowerMetrics(partitionsMadeFollower.map(_.topic).toSet)
+
+        maybeAddLogDirFetchers(partitionsMadeFollower, highWatermarkCheckpoints)
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        leadershipChangeCallbacks.forKeyValue { (onLeadershipChange, leaderAndFollowerPartitions) =>
+          onLeadershipChange(leaderAndFollowerPartitions._1, leaderAndFollowerPartitions._2)
+        }
+      } catch {
+        case e: Throwable =>
+          deferredPartitionsIterator.foreach { metadata =>
+            val partition = metadata.partition
+            val state = cachedState(metadataImage, partition)
+            val topicPartition = partition.topicPartition
+            val leader = state.leaderId == localBrokerId
+            val leaderOrFollower = if (leader) "leader" else "follower"
+            val partitionLogMsgPrefix = s"Apply deferred $leaderOrFollower partition $topicPartition"
+            stateChangeLogger.error(s"$partitionLogMsgPrefix: error while applying deferred metadata.", e)
+          }
+          stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions prior to the error: " +
+            s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)")
+          // Re-throw the exception for it to be caught in BrokerMetadataListener
+          throw e
+      }
+      deferringMetadataChanges = false
+    }
+    val endMs = time.milliseconds()
+    val elapsedMs = endMs - startMs
+    stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions: " +
+      s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)" +
+      s"in $elapsedMs ms")
+    stateChangeLogger.info("Metadata changes are no longer being deferred")
+  }
+
+  /**
+   * Handle changes made by a batch of metadata log records.
+   *
+   * @param imageBuilder       The MetadataImage builder.
+   * @param metadataOffset     The last offset in the batch of records.
+   * @param onLeadershipChange The callbacks to invoke when leadership changes.
+   */
+  def handleMetadataRecords(imageBuilder: MetadataImageBuilder,
+                            metadataOffset: Long,
+                            onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): Unit = {
+    val startMs = time.milliseconds()
+    val builder = imageBuilder.partitionsBuilder()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(("Metadata batch %d: %d local partition(s) changed, %d " +
+        "local partition(s) removed.").format(metadataOffset, builder.localChanged().size,
+        builder.localRemoved().size))
+      if (stateChangeLogger.isTraceEnabled) {
+        builder.localChanged().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally changed: ${state}")
+        }
+        builder.localRemoved().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally removed: ${state}")
+        }
+      }
+      if (deferringMetadataChanges) {
+        val prevPartitions = imageBuilder.prevImage.partitions
+        // partitionChangesToBeDeferred maps each partition to be deferred to whether it is new (i.e. existed before deferral began)
+        val partitionChangesToBeDeferred = mutable.HashMap[Partition, Boolean]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val (partition, priorDeferredMetadata) = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              (None, None)
+
+            case HostedPartition.Online(partition) => (Some(partition), None)
+            case deferred@HostedPartition.Deferred(partition, _, _) => (Some(partition), Some(deferred))
+
+            case HostedPartition.None =>
+              // Create the partition instance since it does not yet exist
+              (Some(Partition(topicPartition, time, configRepository, this)), None)
+          }
+          partition.foreach { partition =>
+            val isNew = priorDeferredMetadata match {
+              case Some(alreadyDeferred) => alreadyDeferred.isNew
+              case _ => prevPartitions.topicPartition(topicPartition.topic(), topicPartition.partition()).isEmpty
+            }
+            partitionChangesToBeDeferred.put(partition, isNew)
+          }
+        }
+
+        stateChangeLogger.info(s"Deferring metadata changes for ${partitionChangesToBeDeferred.size} partition(s)")
+        if (partitionChangesToBeDeferred.nonEmpty) {
+          delegate.makeDeferred(partitionChangesToBeDeferred, metadataOffset, onLeadershipChange)
+        }
+      } else { // not deferring changes, so make leaders/followers accordingly
+        val partitionsToBeLeader = mutable.HashMap[Partition, MetadataPartition]()
+        val partitionsToBeFollower = mutable.HashMap[Partition, MetadataPartition]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val partition = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              None
+
+            case HostedPartition.Online(partition) => Some(partition)
+            case _: HostedPartition.Deferred => throw new IllegalStateException(
+              s"There should never be deferred partition metadata when we aren't deferring changes: $topicPartition")
+
+            case HostedPartition.None =>
+              // it's a partition that we don't know about yet, so create it and mark it online
+              val partition = Partition(topicPartition, time, configRepository, this)
+              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
+              Some(partition)
+          }
+          partition.foreach { partition =>
+            if (currentState.leaderId == localBrokerId) {
+              partitionsToBeLeader.put(partition, currentState)
+            } else {
+              partitionsToBeFollower.put(partition, currentState)
+            }
+          }
+        }
+
+        val prevPartitions = imageBuilder.prevImage.partitions
+        val changedPartitionsPreviouslyExisting = mutable.Set[MetadataPartition]()
+        builder.localChanged().foreach(metadataPartition =>
+          prevPartitions.topicPartition(metadataPartition.topicName, metadataPartition.partitionIndex).foreach(
+            changedPartitionsPreviouslyExisting.add))
+        val nextBrokers = imageBuilder.nextBrokers()

Review comment:
       > would prefer to just give the ReplicaManager access to the BrokersBuilder and put the relevant accessors on that. Otherwise, we create a bunch of objects that get thrown away (since nextBrokers will have to be called again later).
   
   The point in time at which this invocation is occurring is when we are telling `RaftReplicaManager` to process the partition changes from a batch.  This happens after the batch has been otherwise handled by the metadata listener.  I think the most common occurrence for any batch is that there are no broker metadata changes (i.e. no broker has registered/unregistered or fenced/unfenced), in which case invoking `nextBrokers()` simply returns the previous brokers -- we don't create any objects.  If we make the change I think you are suggesting and `RaftReplicaManager` were to invoke `imageBuilder.brokersBuilder().build()` then we would actually invoke `MetadataBrokers(log, newBrokerMap)`, which is much more expensive.  Maybe you are suggesting `MetadataBrokersBuilder` should remember that it was already built?  It sort of does in that any attempt to mutate it after it is built will fail (since its map is now unmodifiable), but calling `build()` a second time could be made more effici
 ent than it is now.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573916465



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: RaftMetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  def this(config: KafkaConfig,
+           metrics: Metrics,
+           time: Time,
+           scheduler: Scheduler,
+           logManager: LogManager,
+           isShuttingDown: AtomicBoolean,
+           quotaManagers: QuotaManagers,
+           brokerTopicStats: BrokerTopicStats,
+           metadataCache: RaftMetadataCache,
+           logDirFailureChannel: LogDirFailureChannel,
+           alterIsrManager: AlterIsrManager,
+           configRepository: ConfigRepository,
+           threadNamePrefix: Option[String] = None) = {
+    this(config, metrics, time, scheduler, logManager, isShuttingDown,
+      quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+      DelayedOperationPurgatory[DelayedProduce](
+        purgatoryName = "Produce", brokerId = config.brokerId,
+        purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedFetch](
+        purgatoryName = "Fetch", brokerId = config.brokerId,
+        purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedDeleteRecords](
+        purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+        purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedElectLeader](
+        purgatoryName = "ElectLeader", brokerId = config.brokerId),
+      threadNamePrefix, configRepository, alterIsrManager)
+  }
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager: RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+    override def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = raftReplicaManager.completeDelayedFetchOrProduceRequests(topicPartition)
+
+    override def config: KafkaConfig = raftReplicaManager.config
+
+    override def error(msg: => String, e: => Throwable): Unit = raftReplicaManager.error(msg, e)
+
+    override def getLogDir(topicPartition: TopicPartition): Option[String] = raftReplicaManager.getLogDir(topicPartition)
+
+    override def initialFetchOffset(log: Log): Long = raftReplicaManager.initialFetchOffset(log)
+
+    override def isShuttingDown: Boolean = raftReplicaManager.isShuttingDown.get
+
+    override def markDeferred(state: HostedPartition.Deferred): Unit = raftReplicaManager.markPartitionDeferred(state)
+
+    override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
+
+    override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
+
+    override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager
+
+    override def stateChangeLogger: StateChangeLogger = raftReplicaManager.stateChangeLogger
+  }
+
+  // visible/overwriteable for testing, generally will not change otherwise
+  private[server] var delegate = new RaftReplicaChangeDelegate(new RaftReplicaManagerChangeDelegateHelper(this))
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    val startMs = time.milliseconds()
+    val partitionsMadeFollower = mutable.Set[Partition]()
+    val partitionsMadeLeader = mutable.Set[Partition]()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val leadershipChangeCallbacks = {
+        mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]()
+      }
+      val metadataImage = metadataCache.currentImage()
+      val brokers = metadataImage.brokers
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+        }
+
+        val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
+          delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+        val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
+          delegate.makeFollowers(partitionsAlreadyExisting, brokers, followerPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+
+        // We need to transition anything that hasn't transitioned from Deferred to Offline to the Online state.
+        // We also need to identify the leadership change callback(s) to invoke
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          val topicPartition = partition.topicPartition
+          // identify for callback if necessary
+          if (state.leaderId == localBrokerId) {
+            if (partitionsMadeLeader.contains(partition)) {
+              leadershipChangeCallbacks.getOrElseUpdate(
+                deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._1 += partition
+            }
+          } else if (partitionsMadeFollower.contains(partition)) {
+            leadershipChangeCallbacks.getOrElseUpdate(
+              deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._2 += partition
+          }
+          // transition from Deferred to Online
+          allPartitions.put(topicPartition, HostedPartition.Online(partition))
+        }
+
+        updateLeaderAndFollowerMetrics(partitionsMadeFollower.map(_.topic).toSet)
+
+        maybeAddLogDirFetchers(partitionsMadeFollower, highWatermarkCheckpoints)
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        leadershipChangeCallbacks.forKeyValue { (onLeadershipChange, leaderAndFollowerPartitions) =>
+          onLeadershipChange(leaderAndFollowerPartitions._1, leaderAndFollowerPartitions._2)
+        }
+      } catch {
+        case e: Throwable =>
+          deferredPartitionsIterator.foreach { metadata =>
+            val partition = metadata.partition
+            val state = cachedState(metadataImage, partition)
+            val topicPartition = partition.topicPartition
+            val leader = state.leaderId == localBrokerId
+            val leaderOrFollower = if (leader) "leader" else "follower"
+            val partitionLogMsgPrefix = s"Apply deferred $leaderOrFollower partition $topicPartition"
+            stateChangeLogger.error(s"$partitionLogMsgPrefix: error while applying deferred metadata.", e)
+          }
+          stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions prior to the error: " +
+            s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)")
+          // Re-throw the exception for it to be caught in BrokerMetadataListener
+          throw e
+      }
+      deferringMetadataChanges = false
+    }
+    val endMs = time.milliseconds()
+    val elapsedMs = endMs - startMs
+    stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions: " +
+      s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)" +
+      s"in $elapsedMs ms")
+    stateChangeLogger.info("Metadata changes are no longer being deferred")
+  }
+
+  /**
+   * Handle changes made by a batch of metadata log records.
+   *
+   * @param imageBuilder       The MetadataImage builder.
+   * @param metadataOffset     The last offset in the batch of records.
+   * @param onLeadershipChange The callbacks to invoke when leadership changes.
+   */
+  def handleMetadataRecords(imageBuilder: MetadataImageBuilder,
+                            metadataOffset: Long,
+                            onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): Unit = {
+    val startMs = time.milliseconds()
+    val builder = imageBuilder.partitionsBuilder()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(("Metadata batch %d: %d local partition(s) changed, %d " +
+        "local partition(s) removed.").format(metadataOffset, builder.localChanged().size,
+        builder.localRemoved().size))
+      if (stateChangeLogger.isTraceEnabled) {
+        builder.localChanged().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally changed: ${state}")
+        }
+        builder.localRemoved().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally removed: ${state}")
+        }
+      }
+      if (deferringMetadataChanges) {
+        val prevPartitions = imageBuilder.prevImage.partitions
+        // partitionChangesToBeDeferred maps each partition to be deferred to whether it is new (i.e. existed before deferral began)
+        val partitionChangesToBeDeferred = mutable.HashMap[Partition, Boolean]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val (partition, priorDeferredMetadata) = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              (None, None)
+
+            case HostedPartition.Online(partition) => (Some(partition), None)
+            case deferred@HostedPartition.Deferred(partition, _, _) => (Some(partition), Some(deferred))
+
+            case HostedPartition.None =>
+              // Create the partition instance since it does not yet exist
+              (Some(Partition(topicPartition, time, configRepository, this)), None)
+          }
+          partition.foreach { partition =>
+            val isNew = priorDeferredMetadata match {
+              case Some(alreadyDeferred) => alreadyDeferred.isNew
+              case _ => prevPartitions.topicPartition(topicPartition.topic(), topicPartition.partition()).isEmpty
+            }
+            partitionChangesToBeDeferred.put(partition, isNew)
+          }
+        }
+
+        stateChangeLogger.info(s"Deferring metadata changes for ${partitionChangesToBeDeferred.size} partition(s)")
+        if (partitionChangesToBeDeferred.nonEmpty) {
+          delegate.makeDeferred(partitionChangesToBeDeferred, metadataOffset, onLeadershipChange)
+        }
+      } else { // not deferring changes, so make leaders/followers accordingly
+        val partitionsToBeLeader = mutable.HashMap[Partition, MetadataPartition]()
+        val partitionsToBeFollower = mutable.HashMap[Partition, MetadataPartition]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val partition = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              None
+
+            case HostedPartition.Online(partition) => Some(partition)
+            case _: HostedPartition.Deferred => throw new IllegalStateException(
+              s"There should never be deferred partition metadata when we aren't deferring changes: $topicPartition")
+
+            case HostedPartition.None =>
+              // it's a partition that we don't know about yet, so create it and mark it online
+              val partition = Partition(topicPartition, time, configRepository, this)
+              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
+              Some(partition)
+          }
+          partition.foreach { partition =>
+            if (currentState.leaderId == localBrokerId) {
+              partitionsToBeLeader.put(partition, currentState)
+            } else {
+              partitionsToBeFollower.put(partition, currentState)
+            }
+          }
+        }
+
+        val prevPartitions = imageBuilder.prevImage.partitions
+        val changedPartitionsPreviouslyExisting = mutable.Set[MetadataPartition]()
+        builder.localChanged().foreach(metadataPartition =>
+          prevPartitions.topicPartition(metadataPartition.topicName, metadataPartition.partitionIndex).foreach(
+            changedPartitionsPreviouslyExisting.add))
+        val nextBrokers = imageBuilder.nextBrokers()

Review comment:
       As discussed offline, will keep it like it is but rename `nextBrokers()` to simply `brokers()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572433443



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -39,11 +41,30 @@ object MetadataPartition {
       record.isr(),
       Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
       Collections.emptyList(),
-      Collections.emptyList())
+      Collections.emptyList(),
+      largestDeferredOffsetEverSeen = deferredAtOffset.getOrElse(OffsetNeverDeferred),
+      isCurrentlyDeferringChanges = deferredAtOffset.isDefined)

Review comment:
       Hmm... why do we need this boolean?  Can't we just check if `largestDeferredOffsetEverSeen` is not `OffsetNeverDeferred`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571237871



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.log.LogManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints}
+import kafka.server.metadata.{ConfigRepository, MetadataBroker, MetadataBrokers, MetadataImageBuilder, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: MetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    val startMs = time.milliseconds()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val partitionsMadeFollower = mutable.Set[Partition]()
+      val partitionsMadeLeader = mutable.Set[Partition]()
+      val leadershipChangeCallbacks =
+        mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]()
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        val mostRecentMetadataOffsets = mutable.Map[Partition, Long]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val state = deferredPartition.metadata
+          val partition = deferredPartition.partition
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+          mostRecentMetadataOffsets.put(partition, deferredPartition.mostRecentMetadataOffset)
+        }
+
+        val partitionsMadeLeader = makeLeaders(partitionsAlreadyExisting, leaderPartitionStates,
+          highWatermarkCheckpoints,-1, mostRecentMetadataOffsets)
+        val partitionsMadeFollower = makeFollowers(partitionsAlreadyExisting,
+          createMetadataBrokersFromCurrentCache, followerPartitionStates,
+          highWatermarkCheckpoints, -1, mostRecentMetadataOffsets)
+
+        // We need to transition anything that hasn't transitioned from Deferred to Offline to the Online state.
+        // We also need to identify the leadership change callback(s) to invoke
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val state = deferredPartition.metadata
+          val partition = deferredPartition.partition
+          val topicPartition = partition.topicPartition
+          // identify for callback if necessary
+          if (state.leaderId == localBrokerId) {
+            if (partitionsMadeLeader.contains(partition)) {
+              leadershipChangeCallbacks.getOrElseUpdate(
+                deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._1 += partition
+            }
+          } else if (partitionsMadeFollower.contains(partition)) {
+            leadershipChangeCallbacks.getOrElseUpdate(
+              deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._2 += partition
+          }
+          // transition from Deferred to Online
+          allPartitions.put(topicPartition, HostedPartition.Online(partition))
+        }
+
+        updateLeaderAndFollowerMetrics(partitionsMadeFollower.map(_.topic).toSet)
+
+        maybeAddLogDirFetchers(partitionsMadeFollower, highWatermarkCheckpoints)
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        leadershipChangeCallbacks.forKeyValue { (onLeadershipChange, leaderAndFollowerPartitions) =>
+          onLeadershipChange(leaderAndFollowerPartitions._1, leaderAndFollowerPartitions._2)
+        }
+      } catch {
+        case e: Throwable =>
+          deferredPartitionsIterator.foreach { metadata =>
+            val state = metadata.metadata
+            val partition = metadata.partition
+            val topicPartition = partition.topicPartition
+            val mostRecentMetadataOffset = metadata.mostRecentMetadataOffset
+            val leader = state.leaderId == localBrokerId
+            val leaderOrFollower = if (leader) "leader" else "follower"
+            val partitionLogMsgPrefix = s"Apply deferred $leaderOrFollower partition $topicPartition last seen in metadata batch $mostRecentMetadataOffset"
+            stateChangeLogger.error(s"$partitionLogMsgPrefix: error while applying deferred metadata.", e)
+          }
+          stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions prior to the error: " +
+            s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)")
+          // Re-throw the exception for it to be caught in BrokerMetadataListener
+          throw e

Review comment:
       > If we fail to apply changes, I guess we have to see that as a fatal error? The only possible way of recovering would be to replay the changes.
   
   Yeah, I think so.  We may need to put effort into minimizing the blast radius of these failures.

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.log.LogManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints}
+import kafka.server.metadata.{ConfigRepository, MetadataBroker, MetadataBrokers, MetadataImageBuilder, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: MetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    val startMs = time.milliseconds()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val partitionsMadeFollower = mutable.Set[Partition]()
+      val partitionsMadeLeader = mutable.Set[Partition]()
+      val leadershipChangeCallbacks =
+        mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]()
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        val mostRecentMetadataOffsets = mutable.Map[Partition, Long]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val state = deferredPartition.metadata
+          val partition = deferredPartition.partition
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+          mostRecentMetadataOffsets.put(partition, deferredPartition.mostRecentMetadataOffset)
+        }
+
+        val partitionsMadeLeader = makeLeaders(partitionsAlreadyExisting, leaderPartitionStates,
+          highWatermarkCheckpoints,-1, mostRecentMetadataOffsets)
+        val partitionsMadeFollower = makeFollowers(partitionsAlreadyExisting,
+          createMetadataBrokersFromCurrentCache, followerPartitionStates,

Review comment:
       > Do we have any guarantee that the metadata cache is in a state that is consistent with the deferred changes?
   
   Good question.  Metadata changes up through the point of applying the deferred partition metadata changes should be applied to the metadata cache at this point.
   
   One thing we need to think about is the fact that we currently don't defer metadata cache changes at all.  The metadata cache will contain partition states that are ahead of ReplicaManager during the time when ReplicaManager is deferring its changes.  This means, for example, that the following will reflect deferred partition changes that have been applied to the metadata cache but that have not been applied to ReplicaManager.  We may have to write test cases for each of these conditions so we can be clear on what the expected behavior should be.
   
   - MetadataRequest
   - FindCoordinatorRequest
   - ElectLeadersRequest with topicPartitions = null
   - DelayedCreatePartitions (in topic purgatory)
   - DelayedElectLeader (in elect leader purgatory)
   - Anything that calls ReplicaManager.fetchMessages() and DelayedFetch (in fetch purgatory), though these seem okay since they wait until they can get enough data?
   - TransactionMarkerChannelManager.addTxnMarkersToBrokerQueue
   - DescribeConfigsRequest
   - OffsetCommitRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
   - ProduceRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
   - FetchRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
   - DeleteRecordsRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
   - AddPartitionsToTxnRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
   - TxnOffsetCommitRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
   - OffsetDeleteRequest (whether or not to send UNKNOWN_TOPIC_OR_PARTITION)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572442503



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -39,11 +41,30 @@ object MetadataPartition {
       record.isr(),
       Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
       Collections.emptyList(),
-      Collections.emptyList())
+      Collections.emptyList(),
+      largestDeferredOffsetEverSeen = deferredAtOffset.getOrElse(OffsetNeverDeferred),
+      isCurrentlyDeferringChanges = deferredAtOffset.isDefined)

Review comment:
       We basically use `largestDeferredOffsetEverSeen` only for logging at this point -- we also check it in a few `private def sanityCheckState...()` `RaftReplicaManager` methods.  We could completely eliminate `largestDeferredOffsetEverSeen` if we didn't want to log when the partition was last deferred.  It just tracks when the partition was last seen and the change at that offset was deferred rather than directly applied.  Once the partition is no longer deferred the value remains whatever it was and the boolean flips to `false`.
   
   It does seem on the surface that we could change the declaration to `deferredSinceOffset` and get rid of the boolean -- and `deferredSinceOffset` would change to `-1` once those changes are applied.  But there is a problem with this if the partition changes to not being deferred in the metadata cache before we ask `RaftReplicaManager` to process all of its deferred changes: the value will be -1 in the metadata cache under those circumstances, and we wouldn't have the value to log.
   
   So I think we have a few options.
   
   1. Do the logging, apply the changes to the matadata cache before replica manager, and keep the `Long` and `Boolean` as currently defined
   2. Do the logging, apply the changes to the matadata cache **after** replica manager, and use just a `Long` (with the semantics being changed as described above)
   3. Just use a Boolean and don't do the logging.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573326890



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: RaftMetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  def this(config: KafkaConfig,
+           metrics: Metrics,
+           time: Time,
+           scheduler: Scheduler,
+           logManager: LogManager,
+           isShuttingDown: AtomicBoolean,
+           quotaManagers: QuotaManagers,
+           brokerTopicStats: BrokerTopicStats,
+           metadataCache: RaftMetadataCache,
+           logDirFailureChannel: LogDirFailureChannel,
+           alterIsrManager: AlterIsrManager,
+           configRepository: ConfigRepository,
+           threadNamePrefix: Option[String] = None) = {
+    this(config, metrics, time, scheduler, logManager, isShuttingDown,
+      quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+      DelayedOperationPurgatory[DelayedProduce](
+        purgatoryName = "Produce", brokerId = config.brokerId,
+        purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedFetch](
+        purgatoryName = "Fetch", brokerId = config.brokerId,
+        purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedDeleteRecords](
+        purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+        purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+      DelayedOperationPurgatory[DelayedElectLeader](
+        purgatoryName = "ElectLeader", brokerId = config.brokerId),
+      threadNamePrefix, configRepository, alterIsrManager)
+  }
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager: RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+    override def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = raftReplicaManager.completeDelayedFetchOrProduceRequests(topicPartition)
+
+    override def config: KafkaConfig = raftReplicaManager.config
+
+    override def error(msg: => String, e: => Throwable): Unit = raftReplicaManager.error(msg, e)
+
+    override def getLogDir(topicPartition: TopicPartition): Option[String] = raftReplicaManager.getLogDir(topicPartition)
+
+    override def initialFetchOffset(log: Log): Long = raftReplicaManager.initialFetchOffset(log)
+
+    override def isShuttingDown: Boolean = raftReplicaManager.isShuttingDown.get
+
+    override def markDeferred(state: HostedPartition.Deferred): Unit = raftReplicaManager.markPartitionDeferred(state)
+
+    override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
+
+    override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
+
+    override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager
+
+    override def stateChangeLogger: StateChangeLogger = raftReplicaManager.stateChangeLogger
+  }
+
+  // visible/overwriteable for testing, generally will not change otherwise
+  private[server] var delegate = new RaftReplicaChangeDelegate(new RaftReplicaManagerChangeDelegateHelper(this))
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    val startMs = time.milliseconds()
+    val partitionsMadeFollower = mutable.Set[Partition]()
+    val partitionsMadeLeader = mutable.Set[Partition]()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val leadershipChangeCallbacks = {
+        mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]()
+      }
+      val metadataImage = metadataCache.currentImage()
+      val brokers = metadataImage.brokers
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+        }
+
+        val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
+          delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+        val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
+          delegate.makeFollowers(partitionsAlreadyExisting, brokers, followerPartitionStates, highWatermarkCheckpoints, None)
+        else
+          Set.empty[Partition]
+
+        // We need to transition anything that hasn't transitioned from Deferred to Offline to the Online state.
+        // We also need to identify the leadership change callback(s) to invoke
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val partition = deferredPartition.partition
+          val state = cachedState(metadataImage, partition)
+          val topicPartition = partition.topicPartition
+          // identify for callback if necessary
+          if (state.leaderId == localBrokerId) {
+            if (partitionsMadeLeader.contains(partition)) {
+              leadershipChangeCallbacks.getOrElseUpdate(
+                deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._1 += partition
+            }
+          } else if (partitionsMadeFollower.contains(partition)) {
+            leadershipChangeCallbacks.getOrElseUpdate(
+              deferredPartition.onLeadershipChange, (mutable.Set(), mutable.Set()))._2 += partition
+          }
+          // transition from Deferred to Online
+          allPartitions.put(topicPartition, HostedPartition.Online(partition))
+        }
+
+        updateLeaderAndFollowerMetrics(partitionsMadeFollower.map(_.topic).toSet)
+
+        maybeAddLogDirFetchers(partitionsMadeFollower, highWatermarkCheckpoints)
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        leadershipChangeCallbacks.forKeyValue { (onLeadershipChange, leaderAndFollowerPartitions) =>
+          onLeadershipChange(leaderAndFollowerPartitions._1, leaderAndFollowerPartitions._2)
+        }
+      } catch {
+        case e: Throwable =>
+          deferredPartitionsIterator.foreach { metadata =>
+            val partition = metadata.partition
+            val state = cachedState(metadataImage, partition)
+            val topicPartition = partition.topicPartition
+            val leader = state.leaderId == localBrokerId
+            val leaderOrFollower = if (leader) "leader" else "follower"
+            val partitionLogMsgPrefix = s"Apply deferred $leaderOrFollower partition $topicPartition"
+            stateChangeLogger.error(s"$partitionLogMsgPrefix: error while applying deferred metadata.", e)
+          }
+          stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions prior to the error: " +
+            s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)")
+          // Re-throw the exception for it to be caught in BrokerMetadataListener
+          throw e
+      }
+      deferringMetadataChanges = false
+    }
+    val endMs = time.milliseconds()
+    val elapsedMs = endMs - startMs
+    stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size + partitionsMadeFollower.size} deferred partitions: " +
+      s"${partitionsMadeLeader.size} leader(s) and ${partitionsMadeFollower.size} follower(s)" +
+      s"in $elapsedMs ms")
+    stateChangeLogger.info("Metadata changes are no longer being deferred")
+  }
+
+  /**
+   * Handle changes made by a batch of metadata log records.
+   *
+   * @param imageBuilder       The MetadataImage builder.
+   * @param metadataOffset     The last offset in the batch of records.
+   * @param onLeadershipChange The callbacks to invoke when leadership changes.
+   */
+  def handleMetadataRecords(imageBuilder: MetadataImageBuilder,
+                            metadataOffset: Long,
+                            onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): Unit = {
+    val startMs = time.milliseconds()
+    val builder = imageBuilder.partitionsBuilder()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(("Metadata batch %d: %d local partition(s) changed, %d " +
+        "local partition(s) removed.").format(metadataOffset, builder.localChanged().size,
+        builder.localRemoved().size))
+      if (stateChangeLogger.isTraceEnabled) {
+        builder.localChanged().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally changed: ${state}")
+        }
+        builder.localRemoved().foreach { state =>
+          stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally removed: ${state}")
+        }
+      }
+      if (deferringMetadataChanges) {
+        val prevPartitions = imageBuilder.prevImage.partitions
+        // partitionChangesToBeDeferred maps each partition to be deferred to whether it is new (i.e. existed before deferral began)
+        val partitionChangesToBeDeferred = mutable.HashMap[Partition, Boolean]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val (partition, priorDeferredMetadata) = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              (None, None)
+
+            case HostedPartition.Online(partition) => (Some(partition), None)
+            case deferred@HostedPartition.Deferred(partition, _, _) => (Some(partition), Some(deferred))
+
+            case HostedPartition.None =>
+              // Create the partition instance since it does not yet exist
+              (Some(Partition(topicPartition, time, configRepository, this)), None)
+          }
+          partition.foreach { partition =>
+            val isNew = priorDeferredMetadata match {
+              case Some(alreadyDeferred) => alreadyDeferred.isNew
+              case _ => prevPartitions.topicPartition(topicPartition.topic(), topicPartition.partition()).isEmpty
+            }
+            partitionChangesToBeDeferred.put(partition, isNew)
+          }
+        }
+
+        stateChangeLogger.info(s"Deferring metadata changes for ${partitionChangesToBeDeferred.size} partition(s)")
+        if (partitionChangesToBeDeferred.nonEmpty) {
+          delegate.makeDeferred(partitionChangesToBeDeferred, metadataOffset, onLeadershipChange)
+        }
+      } else { // not deferring changes, so make leaders/followers accordingly
+        val partitionsToBeLeader = mutable.HashMap[Partition, MetadataPartition]()
+        val partitionsToBeFollower = mutable.HashMap[Partition, MetadataPartition]()
+        builder.localChanged().foreach { currentState =>
+          val topicPartition = currentState.toTopicPartition
+          val partition = getPartition(topicPartition) match {
+            case HostedPartition.Offline =>
+              stateChangeLogger.warn(s"Ignoring handlePartitionChanges at $metadataOffset " +
+                s"for partition $topicPartition as the local replica for the partition is " +
+                "in an offline log directory")
+              None
+
+            case HostedPartition.Online(partition) => Some(partition)
+            case _: HostedPartition.Deferred => throw new IllegalStateException(
+              s"There should never be deferred partition metadata when we aren't deferring changes: $topicPartition")
+
+            case HostedPartition.None =>
+              // it's a partition that we don't know about yet, so create it and mark it online
+              val partition = Partition(topicPartition, time, configRepository, this)
+              allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
+              Some(partition)
+          }
+          partition.foreach { partition =>
+            if (currentState.leaderId == localBrokerId) {
+              partitionsToBeLeader.put(partition, currentState)
+            } else {
+              partitionsToBeFollower.put(partition, currentState)
+            }
+          }
+        }
+
+        val prevPartitions = imageBuilder.prevImage.partitions
+        val changedPartitionsPreviouslyExisting = mutable.Set[MetadataPartition]()
+        builder.localChanged().foreach(metadataPartition =>
+          prevPartitions.topicPartition(metadataPartition.topicName, metadataPartition.partitionIndex).foreach(
+            changedPartitionsPreviouslyExisting.add))
+        val nextBrokers = imageBuilder.nextBrokers()

Review comment:
       rather than doing this, I would prefer to just give the ReplicaManager access to the BrokersBuilder and put the relevant accessors on that.  Otherwise, we create a bunch of objects that get thrown away (since nextBrokers will have to be called again later).  A small thing but it would be good to set up a good pattern here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#issuecomment-776317812


   Thanks @rondagostino .  I left two small comments... LGTM after those are addressed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10069:
URL: https://github.com/apache/kafka/pull/10069


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#issuecomment-774732997


   All builds are green!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571309365



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.log.LogManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints}
+import kafka.server.metadata.{ConfigRepository, MetadataBroker, MetadataBrokers, MetadataImageBuilder, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+                         metrics: Metrics,
+                         time: Time,
+                         scheduler: Scheduler,
+                         logManager: LogManager,
+                         isShuttingDown: AtomicBoolean,
+                         quotaManagers: QuotaManagers,
+                         brokerTopicStats: BrokerTopicStats,
+                         metadataCache: MetadataCache,
+                         logDirFailureChannel: LogDirFailureChannel,
+                         delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
+                         delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
+                         delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
+                         delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
+                         threadNamePrefix: Option[String],
+                         configRepository: ConfigRepository,
+                         alterIsrManager: AlterIsrManager) extends ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, threadNamePrefix, configRepository, alterIsrManager) {
+
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} when using ZooKeeper")
+  }
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    val startMs = time.milliseconds()
+    replicaStateChangeLock synchronized {
+      stateChangeLogger.info(s"Applying deferred metadata changes")
+      val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+      val partitionsMadeFollower = mutable.Set[Partition]()
+      val partitionsMadeLeader = mutable.Set[Partition]()
+      val leadershipChangeCallbacks =
+        mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, (mutable.Set[Partition], mutable.Set[Partition])]()
+      try {
+        val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val followerPartitionStates = mutable.Map[Partition, MetadataPartition]()
+        val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+        val mostRecentMetadataOffsets = mutable.Map[Partition, Long]()
+        deferredPartitionsIterator.foreach { deferredPartition =>
+          val state = deferredPartition.metadata
+          val partition = deferredPartition.partition
+          if (state.leaderId == localBrokerId) {
+            leaderPartitionStates.put(partition, state)
+          } else {
+            followerPartitionStates.put(partition, state)
+          }
+          if (!deferredPartition.isNew) {
+            partitionsAlreadyExisting += state
+          }
+          mostRecentMetadataOffsets.put(partition, deferredPartition.mostRecentMetadataOffset)
+        }
+
+        val partitionsMadeLeader = makeLeaders(partitionsAlreadyExisting, leaderPartitionStates,
+          highWatermarkCheckpoints,-1, mostRecentMetadataOffsets)
+        val partitionsMadeFollower = makeFollowers(partitionsAlreadyExisting,
+          createMetadataBrokersFromCurrentCache, followerPartitionStates,

Review comment:
       I added the partition state with respect to deferral to `MetadataPartitions`.  Storing the information in the metadat cache could help here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r572433038



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._
 
 
 object MetadataPartition {
-  def apply(name: String, record: PartitionRecord): MetadataPartition = {
+  val OffsetNeverDeferred = 0L // must not be a valid offset we could see (i.e. must not be positive)

Review comment:
       Can you add JavaDoc for this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571817474



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -439,71 +414,85 @@ class ReplicaManager(val config: KafkaConfig,
                 responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
               }
 
-            case HostedPartition.Deferred(_) =>
+            case _: HostedPartition.Deferred =>
               throw new IllegalStateException("We should never be deferring partition metadata changes and stopping a replica when using ZooKeeper")
 
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition -> deletePartition
               responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
-        // First stop fetchers for all partitions.
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
-
-        // Second remove deleted partitions from the partition map. Fetchers rely on the
-        // ReplicaManager to get Partition's information so they must be stopped first.
-        val deletedPartitions = mutable.Set.empty[TopicPartition]
-        stoppedPartitions.forKeyValue { (topicPartition, partitionState) =>
-          if (partitionState.deletePartition) {
-            getPartition(topicPartition) match {
-              case hostedPartition@HostedPartition.Online(partition) =>
-                if (allPartitions.remove(topicPartition, hostedPartition)) {
-                  maybeRemoveTopicMetrics(topicPartition.topic)
-                  // Logs are not deleted here. They are deleted in a single batch later on.
-                  // This is done to avoid having to checkpoint for every deletions.
-                  partition.delete()
-                }
-
-              case _ =>
-            }
-
-            deletedPartitions += topicPartition
-          }
-
-          // If we were the leader, we may have some operations still waiting for completion.
-          // We force completion to prevent them from timing out.
-          completeDelayedFetchOrProduceRequests(topicPartition)
-        }
-
-        // Third delete the logs and checkpoint.
-        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => {
-          exception match {
-            case e: KafkaStorageException =>
+        stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
               stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                 "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-
-            case e =>
-              stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
+          } else {
+            stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
                 s"${e.getClass.getName} exception: ${e.getMessage}")
               responseMap.put(topicPartition, Errors.forException(e))
           }
-        })
-
+          responseMap.put(topicPartition, Errors.forException(e))
+        }
         (responseMap, Errors.NONE)
       }
     }
   }
 
+  /**
+   * Stop the given partitions.
+   *
+   * @param partitionsToStop    A map from a topic partition to a boolean indicating
+   *                            whether the partition should be deleted.
+   *
+   * @return                    A map from partitions to exceptions which occurred.
+   *                            If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = {
+    // First stop fetchers for all partitions.
+    val partitions = partitionsToStop.keySet
+    replicaFetcherManager.removeFetcherForPartitions(partitions)
+    replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+    // Second remove deleted partitions from the partition map. Fetchers rely on the
+    // ReplicaManager to get Partition's information so they must be stopped first.
+    val partitionsToDelete = mutable.Set.empty[TopicPartition]
+    partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
+      if (shouldDelete) {
+        getPartition(topicPartition) match {
+          case hostedPartition: NonOffline => // Online or Deferred (Deferred never occurs when using ZooKeeper)

Review comment:
       This comment seems a bit redundant, no? We should have such documentation in the definition of `NonOffline` and avoid duplicating it everywhere it's used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r573201310



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -39,11 +41,30 @@ object MetadataPartition {
       record.isr(),
       Collections.emptyList(), // TODO KAFKA-12285 handle offline replicas
       Collections.emptyList(),
-      Collections.emptyList())
+      Collections.emptyList(),
+      largestDeferredOffsetEverSeen = deferredAtOffset.getOrElse(OffsetNeverDeferred),
+      isCurrentlyDeferringChanges = deferredAtOffset.isDefined)

Review comment:
       As discussed offline, we will eliminate the information from the messages we log when applying deferred changes, and we won't carry that info around in `MetadataPartition`.  Currently `RaftReplicaManager` knows if it is deferring changes or not.  Maybe later when we get `BrokerLifecycleManager` and `BrokerMetadataListener` committed we can think about where a global boolean might live to identify if the broker is fenced or not.  It isn't critical to decide right now because we are only going to defer the application of partition metadata at startup in 2.8.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -30,7 +31,8 @@ import scala.jdk.CollectionConverters._
 
 
 object MetadataPartition {
-  def apply(name: String, record: PartitionRecord): MetadataPartition = {
+  val OffsetNeverDeferred = 0L // must not be a valid offset we could see (i.e. must not be positive)

Review comment:
       As discussed offline, we will remove this and not include the last seen offset in log messages when applying deferred changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571453212



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -248,19 +250,19 @@ class ReplicaManager(val config: KafkaConfig,
 
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
-  private val localBrokerId = config.brokerId
-  private val allPartitions = new Pool[TopicPartition, HostedPartition](
+  protected val localBrokerId = config.brokerId
+  protected val allPartitions = new Pool[TopicPartition, HostedPartition](
     valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, configRepository, this)))
   )
-  private val replicaStateChangeLock = new Object
+  protected val replicaStateChangeLock = new Object
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
   val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   @volatile var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>

Review comment:
       Is this intended to be public? Maybe we should fix it while we're at it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10069: MINOR: Add RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#issuecomment-774696381


   @rondagostino Looks like a file is missing the license:
   
   > 19:18:36  Execution failed for task ':rat'.
   > 19:18:36  > Found 1 files with unknown licenses.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org