You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/02 19:40:41 UTC

[GitHub] [kafka] mumrah opened a new pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

mumrah opened a new pull request #9677:
URL: https://github.com/apache/kafka/pull/9677


   In #9100, we missed the inclusion of the high level ISR shrink/expand metrics that are managed by ReplicaManager. This PR adds a small abstraction that allows us to mark these metrics without bringing ReplicaManager in as a dependency of Partition.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#discussion_r534547385



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1406,10 +1429,13 @@ class Partition(val topicPartition: TopicPartition,
         case Left(error: Errors) => error match {
           case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
             debug(s"Controller failed to update ISR to $proposedIsrState since it doesn't know about this topic or partition. Giving up.")
+            isrChangeMetrics.markFailed()
           case Errors.FENCED_LEADER_EPOCH =>
             debug(s"Controller failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
+            isrChangeMetrics.markFailed()
           case Errors.INVALID_UPDATE_VERSION =>
             debug(s"Controller failed to update ISR to $proposedIsrState due to invalid zk version. Giving up.")
+            isrChangeMetrics.markFailed()
           case _ =>
             warn(s"Controller failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
             sendAlterIsrRequest(proposedIsrState)

Review comment:
       Note that I'm not including the `markFailed` call here. Think we should include it here? Or possibly introduce a new retry metric?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#discussion_r534519335



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -107,10 +106,24 @@ object Partition extends KafkaMetricsGroup {
   def apply(topicPartition: TopicPartition,
             time: Time,
             replicaManager: ReplicaManager): Partition = {
+
+    val isrChangeMetrics = new IsrChangeMetrics {
+      override def markExpand(): Unit = {
+        replicaManager.recordIsrChange(topicPartition)

Review comment:
       Hmm.. I had forgotten about this. The purpose of this call is so that we can propagate the ISR change to the controller, but we shouldn't need to do this down the AlterIsr path. This also makes your initial name `IsrChangeListener` a bit more accurate 😅 .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#discussion_r535210273



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1406,10 +1429,13 @@ class Partition(val topicPartition: TopicPartition,
         case Left(error: Errors) => error match {
           case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
             debug(s"Controller failed to update ISR to $proposedIsrState since it doesn't know about this topic or partition. Giving up.")
+            isrChangeMetrics.markFailed()
           case Errors.FENCED_LEADER_EPOCH =>
             debug(s"Controller failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
+            isrChangeMetrics.markFailed()
           case Errors.INVALID_UPDATE_VERSION =>
             debug(s"Controller failed to update ISR to $proposedIsrState due to invalid zk version. Giving up.")
+            isrChangeMetrics.markFailed()
           case _ =>
             warn(s"Controller failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
             sendAlterIsrRequest(proposedIsrState)

Review comment:
       Sounds good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#issuecomment-738303699


   Failed tests are known flaky
   
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9677/8/
   ![image](https://user-images.githubusercontent.com/55116/101087722-9c578700-3580-11eb-8f88-604ee09eda8c.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #9677:
URL: https://github.com/apache/kafka/pull/9677


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#discussion_r535452252



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1403,7 +1426,9 @@ class Partition(val topicPartition: TopicPartition,
       }
 
       result match {
-        case Left(error: Errors) => error match {
+        case Left(error: Errors) =>
+          isrChangeListener.markFailed()
+          error match {

Review comment:
       nit: now the block below is misaligned




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#discussion_r534480994



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -45,6 +45,12 @@ import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+trait IsrChangeListener {

Review comment:
       Maybe `IsrChangeMetrics`?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -53,7 +59,7 @@ trait PartitionStateStore {
 
 class ZkPartitionStateStore(topicPartition: TopicPartition,
                             zkClient: KafkaZkClient,
-                            replicaManager: ReplicaManager) extends PartitionStateStore {
+                            isrChangeListener: IsrChangeListener) extends PartitionStateStore {

Review comment:
       Maybe we can move this out of `ZkPartitionStateStore` since we need the dependence on `IsrChangeListener` in `Partition` anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#discussion_r534571434



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1406,10 +1429,13 @@ class Partition(val topicPartition: TopicPartition,
         case Left(error: Errors) => error match {
           case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
             debug(s"Controller failed to update ISR to $proposedIsrState since it doesn't know about this topic or partition. Giving up.")
+            isrChangeMetrics.markFailed()
           case Errors.FENCED_LEADER_EPOCH =>
             debug(s"Controller failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
+            isrChangeMetrics.markFailed()
           case Errors.INVALID_UPDATE_VERSION =>
             debug(s"Controller failed to update ISR to $proposedIsrState due to invalid zk version. Giving up.")
+            isrChangeMetrics.markFailed()
           case _ =>
             warn(s"Controller failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
             sendAlterIsrRequest(proposedIsrState)

Review comment:
       Hmm, I think we should probably do it. If we hit some unexpected case which the broker can't get out of, we'd want the metric to show it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org