You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "clolov (via GitHub)" <gi...@apache.org> on 2023/02/06 14:39:41 UTC

[GitHub] [kafka] clolov commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

clolov commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1097383481


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,25 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used

Review Comment:
   ```suggestion
    * AND that locks may be held during their execution. They are meant to be used
   ```



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(replicas, partition.assignmentState.replicas)
   }
 
+  @Test
+  def testAddAndRemoveListeners(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener1 = new MockPartitionListener()
+    val listener2 = new MockPartitionListener()
+
+    assertTrue(partition.maybeAddListener(listener1))
+    listener1.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener1.verify()
+    listener2.verify()
+
+    assertTrue(partition.maybeAddListener(listener2))
+    listener2.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+    partition.removeListener(listener1)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify()
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+  }
+
+  @Test
+  def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    partition.delete()
+
+    assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+  }
+
+  @Test
+  def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)

Review Comment:
   Nit:
   ```suggestion
       assertTrue(partition.maybeAddListener(listener))
   ```



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -334,6 +334,29 @@ class ReplicaManager(val config: KafkaConfig,
     topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
   }
 
+  /**
+   * Registers the provided listener to the partition iff the partition is online.
+   */
+  def maybeAddListener(partition: TopicPartition, listener: PartitionListener): Boolean = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.maybeAddListener(listener)
+      case _ =>
+        false
+    }
+  }
+
+  /**
+   * Removes the provided listener from the partition.
+   */
+  def removeListener(partition: TopicPartition, listener: PartitionListener): Unit = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.removeListener(listener)
+      case _ => // Ignore

Review Comment:
   Could you please elaborate why we cannot remove a listener from an offline partition as well? Most probably I lack context, but this seems like a reasonable operation to me.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -73,6 +73,22 @@ object LogAppendInfo {
       offsetsMonotonic = false, -1L, recordErrors, errorMessage)
 }
 
+/**
+ * Listener receive notification from the Log.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used

Review Comment:
   ```suggestion
    * AND that locks may be held during their execution. They are meant to be used
   ```



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(replicas, partition.assignmentState.replicas)
   }
 
+  @Test
+  def testAddAndRemoveListeners(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener1 = new MockPartitionListener()
+    val listener2 = new MockPartitionListener()
+
+    assertTrue(partition.maybeAddListener(listener1))
+    listener1.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener1.verify()
+    listener2.verify()
+
+    assertTrue(partition.maybeAddListener(listener2))
+    listener2.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+    partition.removeListener(listener1)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify()
+    listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+  }
+
+  @Test
+  def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    partition.delete()
+
+    assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+  }
+
+  @Test
+  def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)
+    listener.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener.verify()
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+    partition.truncateFullyAndStartAt(0L, false)
+
+    listener.verify(expectedHighWatermark = 0L)
+  }
+
+  @Test
+  def testPartitionListenerWhenCurrentIsReplacedWithFutureLog(): Unit = {
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
+    assertTrue(partition.log.isDefined)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)

Review Comment:
   My only reasoning for the nit is that the test can fail fast if someone changes the implementation logic:
   ```suggestion
       assertTrue(partition.maybeAddListener(listener))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org