You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dajac (via GitHub)" <gi...@apache.org> on 2023/02/03 15:50:58 UTC

[GitHub] [kafka] dajac opened a new pull request, #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

dajac opened a new pull request, #13196:
URL: https://github.com/apache/kafka/pull/13196

   In the context of KIP-848, we implements are new group coordinator in Java. This new coordinator follows the architecture of the new quorum controller. It is basically a replicated state machine that writes to the log and commits its internal state when the writes are committed. At the moment, the only way to know when a write is committed is to use a delayed fetch. This is not ideal in our context because a delayed fetch can be completed before the write is actually committed to the log.
   
   This patch proposes to introduce a high watermark listener to the Partition/Log layers. This will allow the new group coordinator to simply listen to changes and commit its state based on this. This mechanism is simpler and lighter as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1104123007


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,35 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}

Review Comment:
   Yeah, this is needed for [KAFKA-14673](https://issues.apache.org/jira/browse/KAFKA-14673).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1108852113


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}
+
+  /**
+   * Called when the Partition is deleted.
+   */
+  def onDeleted(partition: TopicPartition): Unit = {}

Review Comment:
   This was is basically called when `Partition.delete` is called so `delete` makes sense in my opinion. But, yeah, naming is hard :).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1103164836


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,35 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}

Review Comment:
   It seems in this PR these two do nothing. Are there plans to use them in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1108755031


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}
+
+  /**
+   * Called when the Partition is deleted.
+   */
+  def onDeleted(partition: TopicPartition): Unit = {}

Review Comment:
   I kind of like `onDeleted` because it goes well with `onFailed`. Isn't it implicit that we are talking about the local partition (== replica)? I could perhaps make this clearer in the javadoc.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1843,8 +1866,11 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock synchronized {
-    allPartitions.put(tp, HostedPartition.Offline)
-    Partition.removeMetrics(tp)
+    allPartitions.put(tp, HostedPartition.Offline) match {
+      case HostedPartition.Online(partition) =>
+        partition.fail()

Review Comment:
   `markOffline` sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1107792509


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.

Review Comment:
   Maybe worth mentioning that this is also called after the initial registration?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1843,8 +1866,11 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock synchronized {
-    allPartitions.put(tp, HostedPartition.Offline)
-    Partition.removeMetrics(tp)
+    allPartitions.put(tp, HostedPartition.Offline) match {
+      case HostedPartition.Online(partition) =>
+        partition.fail()

Review Comment:
   How about `markOffline` or something consistent with the name `markPartitionOffline`?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}
+
+  /**
+   * Called when the Partition is deleted.
+   */
+  def onDeleted(partition: TopicPartition): Unit = {}

Review Comment:
   I wonder if we can clarify in the name what the deletion signifies. I think it just means the replica was deleted, so maybe `onReplicaDeleted` or something like that perhaps?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -318,6 +358,25 @@ class Partition(val topicPartition: TopicPartition,
 
   def inSyncReplicaIds: Set[Int] = partitionState.isr
 
+  def maybeAddListener(listener: PartitionListener): Boolean = {
+    inReadLock(leaderIsrUpdateLock) {
+      // `log` is set to `None` when the partition is failed or deleted.
+      log match {
+        case Some(log) =>
+          listener.onHighWatermarkUpdated(topicPartition, log.highWatermark)
+          listeners.add(listener)

Review Comment:
   Is there a race here between the initial `onHighWatermarkUpdated` and the addition of the listener?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1108752686


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -318,6 +358,25 @@ class Partition(val topicPartition: TopicPartition,
 
   def inSyncReplicaIds: Set[Int] = partitionState.isr
 
+  def maybeAddListener(listener: PartitionListener): Boolean = {
+    inReadLock(leaderIsrUpdateLock) {
+      // `log` is set to `None` when the partition is failed or deleted.
+      log match {
+        case Some(log) =>
+          listener.onHighWatermarkUpdated(topicPartition, log.highWatermark)
+          listeners.add(listener)

Review Comment:
   I think that you are right. As a second thought, I don't need this call during the registration for the new group coordinator because it registers before doing any writes. I will remove it.
   
   @jolshan The idea was to provide an initial value to the listener.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1103166441


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,35 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}

Review Comment:
   Taking a closer look, I guess we don't have a Listener that currently does anything in the main code -- so looks like this PR lays the groundwork for one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1108753117


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.

Review Comment:
   See me reply to your other comment. I will remove that call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1108755781


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -285,6 +314,17 @@ class Partition(val topicPartition: TopicPartition,
   // If ReplicaAlterLogDir command is in progress, this is future location of the log
   @volatile var futureLog: Option[UnifiedLog] = None
 
+  // Partition listeners
+  private val listeners = new CopyOnWriteArrayList[PartitionListener]()
+
+  private val logOffsetsListener = new LogOffsetsListener {
+    override def onHighWatermarkUpdated(offset: Long): Unit = {

Review Comment:
   > For my understanding, they way we update the Partition Listeners is that we have our own Log layer listener we rely on for this purpose?
   
   That's correct.
   
   > Will we use the logOffsetsListener for any other purposes?
   
   Not at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1107850529


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -285,6 +314,17 @@ class Partition(val topicPartition: TopicPartition,
   // If ReplicaAlterLogDir command is in progress, this is future location of the log
   @volatile var futureLog: Option[UnifiedLog] = None
 
+  // Partition listeners
+  private val listeners = new CopyOnWriteArrayList[PartitionListener]()
+
+  private val logOffsetsListener = new LogOffsetsListener {
+    override def onHighWatermarkUpdated(offset: Long): Unit = {

Review Comment:
   For my understanding, they way we update the Partition Listeners is that we have our own Log layer listener we rely on for this purpose?
   
   Will we use the logOffsetsListener for any other purposes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1103166441


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,35 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}

Review Comment:
   Taking a closer look, I guess we don't have a PartitionListener that currently does anything in the main code -- so looks like this PR lays the groundwork for one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1097383481


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,25 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used

Review Comment:
   ```suggestion
    * AND that locks may be held during their execution. They are meant to be used
   ```



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(replicas, partition.assignmentState.replicas)
   }
 
+  @Test
+  def testAddAndRemoveListeners(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener1 = new MockPartitionListener()
+    val listener2 = new MockPartitionListener()
+
+    assertTrue(partition.maybeAddListener(listener1))
+    listener1.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener1.verify()
+    listener2.verify()
+
+    assertTrue(partition.maybeAddListener(listener2))
+    listener2.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+    partition.removeListener(listener1)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify()
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+  }
+
+  @Test
+  def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    partition.delete()
+
+    assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+  }
+
+  @Test
+  def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)

Review Comment:
   Nit:
   ```suggestion
       assertTrue(partition.maybeAddListener(listener))
   ```



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -334,6 +334,29 @@ class ReplicaManager(val config: KafkaConfig,
     topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
   }
 
+  /**
+   * Registers the provided listener to the partition iff the partition is online.
+   */
+  def maybeAddListener(partition: TopicPartition, listener: PartitionListener): Boolean = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.maybeAddListener(listener)
+      case _ =>
+        false
+    }
+  }
+
+  /**
+   * Removes the provided listener from the partition.
+   */
+  def removeListener(partition: TopicPartition, listener: PartitionListener): Unit = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.removeListener(listener)
+      case _ => // Ignore

Review Comment:
   Could you please elaborate why we cannot remove a listener from an offline partition as well? Most probably I lack context, but this seems like a reasonable operation to me.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -73,6 +73,22 @@ object LogAppendInfo {
       offsetsMonotonic = false, -1L, recordErrors, errorMessage)
 }
 
+/**
+ * Listener receive notification from the Log.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used

Review Comment:
   ```suggestion
    * AND that locks may be held during their execution. They are meant to be used
   ```



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(replicas, partition.assignmentState.replicas)
   }
 
+  @Test
+  def testAddAndRemoveListeners(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener1 = new MockPartitionListener()
+    val listener2 = new MockPartitionListener()
+
+    assertTrue(partition.maybeAddListener(listener1))
+    listener1.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener1.verify()
+    listener2.verify()
+
+    assertTrue(partition.maybeAddListener(listener2))
+    listener2.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+    partition.removeListener(listener1)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify()
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+  }
+
+  @Test
+  def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    partition.delete()
+
+    assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+  }
+
+  @Test
+  def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)
+    listener.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener.verify()
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+    partition.truncateFullyAndStartAt(0L, false)
+
+    listener.verify(expectedHighWatermark = 0L)
+  }
+
+  @Test
+  def testPartitionListenerWhenCurrentIsReplacedWithFutureLog(): Unit = {
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
+    assertTrue(partition.log.isDefined)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)

Review Comment:
   My only reasoning for the nit is that the test can fail fast if someone changes the implementation logic:
   ```suggestion
       assertTrue(partition.maybeAddListener(listener))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1108835492


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -44,11 +44,40 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata, LogOffsetsListener}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+trait PartitionListener {
+  /**
+   * Called when the Log increments its high watermark.
+   */
+  def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {}
+
+  /**
+   * Called when the Partition has a failure (e.g. goes offline).
+   */
+  def onFailed(partition: TopicPartition): Unit = {}
+
+  /**
+   * Called when the Partition is deleted.
+   */
+  def onDeleted(partition: TopicPartition): Unit = {}

Review Comment:
   Is there another word we could use like "removed"? Maybe that's also unclear but I remember in the discussion about bumping epoch on stop replica there are a lot of reasons a replica is no longer on a broker. 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1107851690


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -318,6 +358,25 @@ class Partition(val topicPartition: TopicPartition,
 
   def inSyncReplicaIds: Set[Int] = partitionState.isr
 
+  def maybeAddListener(listener: PartitionListener): Boolean = {
+    inReadLock(leaderIsrUpdateLock) {
+      // `log` is set to `None` when the partition is failed or deleted.
+      log match {
+        case Some(log) =>
+          listener.onHighWatermarkUpdated(topicPartition, log.highWatermark)
+          listeners.add(listener)

Review Comment:
   Sorry if I missed it, but why do we call onHighWatermarkUpdated here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1098775244


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -334,6 +334,29 @@ class ReplicaManager(val config: KafkaConfig,
     topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
   }
 
+  /**
+   * Registers the provided listener to the partition iff the partition is online.
+   */
+  def maybeAddListener(partition: TopicPartition, listener: PartitionListener): Boolean = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.maybeAddListener(listener)
+      case _ =>
+        false
+    }
+  }
+
+  /**
+   * Removes the provided listener from the partition.
+   */
+  def removeListener(partition: TopicPartition, listener: PartitionListener): Unit = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.removeListener(listener)
+      case _ => // Ignore

Review Comment:
   When the partition goes offline or is deleted, the `Partition` object no longer exists so we cannot remove the listener from it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13196:
URL: https://github.com/apache/kafka/pull/13196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1107850529


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -285,6 +314,17 @@ class Partition(val topicPartition: TopicPartition,
   // If ReplicaAlterLogDir command is in progress, this is future location of the log
   @volatile var futureLog: Option[UnifiedLog] = None
 
+  // Partition listeners
+  private val listeners = new CopyOnWriteArrayList[PartitionListener]()
+
+  private val logOffsetsListener = new LogOffsetsListener {
+    override def onHighWatermarkUpdated(offset: Long): Unit = {

Review Comment:
   For my understanding, they way we update the Partition Listeners is that we have our own Log layer listener we rely on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org