You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "clolov (via GitHub)" <gi...@apache.org> on 2023/02/06 14:39:41 UTC
[GitHub] [kafka] clolov commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers
clolov commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1097383481
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,25 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used
Review Comment:
```suggestion
* AND that locks may be held during their execution. They are meant to be used
```
##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(replicas, partition.assignmentState.replicas)
}
+ @Test
+ def testAddAndRemoveListeners(): Unit = {
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(brokerId)
+ .setLeaderEpoch(0)
+ .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setIsNew(true),
+ offsetCheckpoints,
+ topicId = None)
+
+ val listener1 = new MockPartitionListener()
+ val listener2 = new MockPartitionListener()
+
+ assertTrue(partition.maybeAddListener(listener1))
+ listener1.verify(expectedHighWatermark = 0L)
+
+ partition.appendRecordsToLeader(
+ records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+ origin = AppendOrigin.CLIENT,
+ requiredAcks = 0,
+ requestLocal = RequestLocal.NoCaching
+ )
+
+ listener1.verify()
+ listener2.verify()
+
+ assertTrue(partition.maybeAddListener(listener2))
+ listener2.verify(expectedHighWatermark = 0L)
+
+ partition.appendRecordsToLeader(
+ records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))),
+ origin = AppendOrigin.CLIENT,
+ requiredAcks = 0,
+ requestLocal = RequestLocal.NoCaching
+ )
+
+ fetchFollower(
+ partition = partition,
+ replicaId = brokerId + 1,
+ fetchOffset = partition.localLogOrException.logEndOffset
+ )
+
+ listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+ listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+ partition.removeListener(listener1)
+
+ partition.appendRecordsToLeader(
+ records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
+ origin = AppendOrigin.CLIENT,
+ requiredAcks = 0,
+ requestLocal = RequestLocal.NoCaching
+ )
+
+ fetchFollower(
+ partition = partition,
+ replicaId = brokerId + 1,
+ fetchOffset = partition.localLogOrException.logEndOffset
+ )
+
+ listener1.verify()
+ listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+ }
+
+ @Test
+ def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(brokerId)
+ .setLeaderEpoch(0)
+ .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setIsNew(true),
+ offsetCheckpoints,
+ topicId = None)
+
+ partition.delete()
+
+ assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+ }
+
+ @Test
+ def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(brokerId)
+ .setLeaderEpoch(0)
+ .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setIsNew(true),
+ offsetCheckpoints,
+ topicId = None)
+
+ val listener = new MockPartitionListener()
+ partition.maybeAddListener(listener)
Review Comment:
Nit:
```suggestion
assertTrue(partition.maybeAddListener(listener))
```
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -334,6 +334,29 @@ class ReplicaManager(val config: KafkaConfig,
topicPartitions.foreach(tp => delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
}
+ /**
+ * Registers the provided listener to the partition iff the partition is online.
+ */
+ def maybeAddListener(partition: TopicPartition, listener: PartitionListener): Boolean = {
+ getPartition(partition) match {
+ case HostedPartition.Online(partition) =>
+ partition.maybeAddListener(listener)
+ case _ =>
+ false
+ }
+ }
+
+ /**
+ * Removes the provided listener from the partition.
+ */
+ def removeListener(partition: TopicPartition, listener: PartitionListener): Unit = {
+ getPartition(partition) match {
+ case HostedPartition.Online(partition) =>
+ partition.removeListener(listener)
+ case _ => // Ignore
Review Comment:
Could you please elaborate why we cannot remove a listener from an offline partition as well? Most probably I lack context, but this seems like a reasonable operation to me.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -73,6 +73,22 @@ object LogAppendInfo {
offsetsMonotonic = false, -1L, recordErrors, errorMessage)
}
+/**
+ * Listener receive notification from the Log.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used
Review Comment:
```suggestion
* AND that locks may be held during their execution. They are meant to be used
```
##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(replicas, partition.assignmentState.replicas)
}
+ @Test
+ def testAddAndRemoveListeners(): Unit = {
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(brokerId)
+ .setLeaderEpoch(0)
+ .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setIsNew(true),
+ offsetCheckpoints,
+ topicId = None)
+
+ val listener1 = new MockPartitionListener()
+ val listener2 = new MockPartitionListener()
+
+ assertTrue(partition.maybeAddListener(listener1))
+ listener1.verify(expectedHighWatermark = 0L)
+
+ partition.appendRecordsToLeader(
+ records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+ origin = AppendOrigin.CLIENT,
+ requiredAcks = 0,
+ requestLocal = RequestLocal.NoCaching
+ )
+
+ listener1.verify()
+ listener2.verify()
+
+ assertTrue(partition.maybeAddListener(listener2))
+ listener2.verify(expectedHighWatermark = 0L)
+
+ partition.appendRecordsToLeader(
+ records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))),
+ origin = AppendOrigin.CLIENT,
+ requiredAcks = 0,
+ requestLocal = RequestLocal.NoCaching
+ )
+
+ fetchFollower(
+ partition = partition,
+ replicaId = brokerId + 1,
+ fetchOffset = partition.localLogOrException.logEndOffset
+ )
+
+ listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+ listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+ partition.removeListener(listener1)
+
+ partition.appendRecordsToLeader(
+ records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
+ origin = AppendOrigin.CLIENT,
+ requiredAcks = 0,
+ requestLocal = RequestLocal.NoCaching
+ )
+
+ fetchFollower(
+ partition = partition,
+ replicaId = brokerId + 1,
+ fetchOffset = partition.localLogOrException.logEndOffset
+ )
+
+ listener1.verify()
+ listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+ }
+
+ @Test
+ def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(brokerId)
+ .setLeaderEpoch(0)
+ .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setIsNew(true),
+ offsetCheckpoints,
+ topicId = None)
+
+ partition.delete()
+
+ assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+ }
+
+ @Test
+ def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+ partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
+
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(brokerId)
+ .setLeaderEpoch(0)
+ .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setIsNew(true),
+ offsetCheckpoints,
+ topicId = None)
+
+ val listener = new MockPartitionListener()
+ partition.maybeAddListener(listener)
+ listener.verify(expectedHighWatermark = 0L)
+
+ partition.appendRecordsToLeader(
+ records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
+ origin = AppendOrigin.CLIENT,
+ requiredAcks = 0,
+ requestLocal = RequestLocal.NoCaching
+ )
+
+ listener.verify()
+
+ fetchFollower(
+ partition = partition,
+ replicaId = brokerId + 1,
+ fetchOffset = partition.localLogOrException.logEndOffset
+ )
+
+ listener.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
+
+ partition.truncateFullyAndStartAt(0L, false)
+
+ listener.verify(expectedHighWatermark = 0L)
+ }
+
+ @Test
+ def testPartitionListenerWhenCurrentIsReplacedWithFutureLog(): Unit = {
+ logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+ partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
+ assertTrue(partition.log.isDefined)
+
+ partition.makeLeader(
+ new LeaderAndIsrPartitionState()
+ .setControllerEpoch(0)
+ .setLeader(brokerId)
+ .setLeaderEpoch(0)
+ .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setIsNew(true),
+ offsetCheckpoints,
+ topicId = None)
+
+ val listener = new MockPartitionListener()
+ partition.maybeAddListener(listener)
Review Comment:
My only reasoning for the nit is that the test can fail fast if someone changes the implementation logic:
```suggestion
assertTrue(partition.maybeAddListener(listener))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org