You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/03 23:35:40 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464716040



##########
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##########
@@ -0,0 +1,121 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 60000L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager,
+                                 val zkClient: KafkaZkClient,
+                                 val scheduler: Scheduler,
+                                 val brokerId: Int,
+                                 val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    pendingIsrUpdates synchronized {
+      pendingIsrUpdates += alterIsrItem
+      lastIsrChangeMs.set(System.currentTimeMillis())
+    }
+  }
+
+  override def startup(): Unit = {
+    scheduler.schedule("alter-isr-send", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)

Review comment:
       That makes sense. I'll change that (this was pulled in from the previous ISR notification code in ReplicaManager)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org