You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "hachikuji (via GitHub)" <gi...@apache.org> on 2023/02/15 23:23:17 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

hachikuji commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1107792509


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.

Review Comment:
   Maybe worth mentioning that this is also called after the initial registration?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1843,8 +1866,11 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock synchronized {
-    allPartitions.put(tp, HostedPartition.Offline)
-    Partition.removeMetrics(tp)
+    allPartitions.put(tp, HostedPartition.Offline) match {
+      case HostedPartition.Online(partition) =>
+        partition.fail()

Review Comment:
   How about `markOffline` or something consistent with the name `markPartitionOffline`?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}
+
+  /**
+   * Called when the Partition is deleted.
+   */
+  def onDeleted(partition: TopicPartition): Unit = {}

Review Comment:
   I wonder if we can clarify in the name what the deletion signifies. I think it just means the replica was deleted, so maybe `onReplicaDeleted` or something like that perhaps?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -318,6 +358,25 @@ class Partition(val topicPartition: TopicPartition,
 
   def inSyncReplicaIds: Set[Int] = partitionState.isr
 
+  def maybeAddListener(listener: PartitionListener): Boolean = {
+    inReadLock(leaderIsrUpdateLock) {
+      // `log` is set to `None` when the partition is failed or deleted.
+      log match {
+        case Some(log) =>
+          listener.onHighWatermarkUpdated(topicPartition, log.highWatermark)
+          listeners.add(listener)

Review Comment:
   Is there a race here between the initial `onHighWatermarkUpdated` and the addition of the listener?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org