You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/04/05 16:15:04 UTC

[kafka] branch trunk updated: KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 31d191f  KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)
31d191f is described below

commit 31d191fc85229c9b0a6e85aad56e8a7975b295e1
Author: Kevin Lu <lu...@berkeley.edu>
AuthorDate: Fri Apr 5 09:14:41 2019 -0700

    KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)
    
    - Add `AtMinIsrPartitionCount` metric to `ReplicaManager`
    - Add `AtMinIsr` metric to `Partition`
    - Add `--at-min-isr-partitions` describe `TopicCommand` option
    
    https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103089398
    
    Author: Kevin Lu <lu...@berkeley.edu>
    Author: lu.kevin@berkeley.edu <ke...@paypal.com>
    
    Reviewers: Gwen Shapira
    
    Closes #6421 from KevinLiLu/KAFKA-7904
---
 core/src/main/scala/kafka/admin/TopicCommand.scala | 28 ++++++++++++++++------
 core/src/main/scala/kafka/cluster/Partition.scala  | 19 +++++++++++++++
 .../main/scala/kafka/server/ReplicaManager.scala   |  7 ++++++
 .../admin/TopicCommandWithAdminClientTest.scala    | 22 +++++++++++++++++
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 21 ++++++++++++++++
 docs/ops.html                                      |  5 ++++
 6 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 0c26fc9..b21989e 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -104,7 +104,7 @@ object TopicCommand extends Logging {
                                    describeConfigs: Boolean)
 
   class DescribeOptions(opts: TopicCommandOptions, liveBrokers: Set[Int]) {
-    val describeConfigs: Boolean = !opts.reportUnavailablePartitions && !opts.reportUnderReplicatedPartitions && !opts.reportUnderMinIsrPartitions
+    val describeConfigs: Boolean = !opts.reportUnavailablePartitions && !opts.reportUnderReplicatedPartitions && !opts.reportUnderMinIsrPartitions && !opts.reportAtMinIsrPartitions
     val describePartitions: Boolean = !opts.reportOverriddenConfigs
     private def hasUnderReplicatedPartitions(partitionDescription: PartitionDescription) = {
       partitionDescription.isr.size < partitionDescription.assignedReplicas.size
@@ -121,15 +121,22 @@ object TopicCommand extends Logging {
     private def hasUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
       partitionDescription.isr.size < partitionDescription.minIsrCount
     }
+    private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
+      partitionDescription.isr.size == partitionDescription.minIsrCount
+    }
     private def shouldPrintUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
       opts.reportUnderMinIsrPartitions && hasUnderMinIsrPartitions(partitionDescription)
     }
+    private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
+      opts.reportAtMinIsrPartitions && hasAtMinIsrPartitions(partitionDescription)
+    }
 
     def shouldPrintTopicPartition(partitionDesc: PartitionDescription): Boolean = {
       describeConfigs ||
         shouldPrintUnderReplicatedPartitions(partitionDesc) ||
         shouldPrintUnavailablePartitions(partitionDesc) ||
-        shouldPrintUnderMinIsrPartitions(partitionDesc)
+        shouldPrintUnderMinIsrPartitions(partitionDesc) ||
+        shouldPrintAtMinIsrPartitions(partitionDesc)
     }
   }
 
@@ -228,7 +235,7 @@ object TopicCommand extends Logging {
           }
         }
         if (describeOptions.describePartitions) {
-          val computedMinIsrCount = if (opts.reportUnderMinIsrPartitions)
+          val computedMinIsrCount = if (opts.reportUnderMinIsrPartitions || opts.reportAtMinIsrPartitions)
             allConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, td.name())).get().get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt else 0
           for (partition <- sortedPartitions) {
             val partitionDesc = PartitionDescription(
@@ -550,6 +557,8 @@ object TopicCommand extends Logging {
                                                             "if set when describing topics, only show partitions whose leader is not available")
     private val reportUnderMinIsrPartitionsOpt = parser.accepts("under-min-isr-partitions",
                                                             "if set when describing topics, only show partitions whose isr count is less than the configured minimum. Not supported with the --zookeeper option.")
+    private val reportAtMinIsrPartitionsOpt = parser.accepts("at-min-isr-partitions",
+                                                            "if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.")
     private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
                                                 "if set when describing topics, only show topics that have overridden configs")
     private val ifExistsOpt = parser.accepts("if-exists",
@@ -568,6 +577,8 @@ object TopicCommand extends Logging {
 
     private val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
 
+    private val allReplicationReportOpts: Set[OptionSpec[_]] = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt)
+
     def has(builder: OptionSpec[_]): Boolean = options.has(builder)
     def valueAsOption[A](option: OptionSpec[A], defaultValue: Option[A] = None): Option[A] = if (has(option)) Some(options.valueOf(option)) else defaultValue
     def valuesAsOption[A](option: OptionSpec[A], defaultValue: Option[util.List[A]] = None): Option[util.List[A]] = if (has(option)) Some(options.valuesOf(option)) else defaultValue
@@ -593,6 +604,7 @@ object TopicCommand extends Logging {
     def reportUnderReplicatedPartitions: Boolean = has(reportUnderReplicatedPartitionsOpt)
     def reportUnavailablePartitions: Boolean = has(reportUnavailablePartitionsOpt)
     def reportUnderMinIsrPartitions: Boolean = has(reportUnderMinIsrPartitionsOpt)
+    def reportAtMinIsrPartitions: Boolean = has(reportAtMinIsrPartitionsOpt)
     def reportOverriddenConfigs: Boolean = has(topicsWithOverridesOpt)
     def ifExists: Boolean = has(ifExistsOpt)
     def ifNotExists: Boolean = has(ifNotExistsOpt)
@@ -636,13 +648,15 @@ object TopicCommand extends Logging {
       if(options.has(createOpt))
           CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
-        allTopicLevelOpts -- Set(describeOpt) + reportUnderMinIsrPartitionsOpt + reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
+        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt,
-        allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
+        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
+        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
-        allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
+        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
-        allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnderMinIsrPartitionsOpt + reportUnavailablePartitionsOpt)
+        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
       CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 698fb48..30ce756 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -127,6 +127,15 @@ class Partition(val topicPartition: TopicPartition,
       tags
     )
 
+    newGauge("AtMinIsr",
+      new Gauge[Int] {
+        def value = {
+          if (isAtMinIsr) 1 else 0
+        }
+      },
+      tags
+    )
+
     newGauge("ReplicasCount",
       new Gauge[Int] {
         def value = {
@@ -162,6 +171,15 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  def isAtMinIsr: Boolean = {
+    leaderReplicaIfLocal match {
+      case Some(leaderReplica) =>
+        inSyncReplicas.size == leaderReplica.log.get.config.minInSyncReplicas
+      case None =>
+        false
+    }
+  }
+
   /**
     * Create the future replica if 1) the current replica is not in the given log directory and 2) the future replica
     * does not exist. This method assumes that the current replica has already been created.
@@ -1022,6 +1040,7 @@ class Partition(val topicPartition: TopicPartition,
     removeMetric("InSyncReplicasCount", tags)
     removeMetric("ReplicasCount", tags)
     removeMetric("LastStableOffsetLag", tags)
+    removeMetric("AtMinIsr", tags)
   }
 
   override def equals(that: Any): Boolean = that match {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c194bd5..71c5f86 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -241,6 +241,12 @@ class ReplicaManager(val config: KafkaConfig,
       def value = leaderPartitionsIterator.count(_.isUnderMinIsr)
     }
   )
+  val atMinIsrPartitionCount = newGauge(
+    "AtMinIsrPartitionCount",
+    new Gauge[Int] {
+      def value = leaderPartitionsIterator.count(_.isAtMinIsr)
+    }
+  )
 
   val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
   val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
@@ -1482,6 +1488,7 @@ class ReplicaManager(val config: KafkaConfig,
     removeMetric("OfflineReplicaCount")
     removeMetric("UnderReplicatedPartitions")
     removeMetric("UnderMinIsrPartitionCount")
+    removeMetric("AtMinIsrPartitionCount")
   }
 
   // High watermark do not need to be checkpointed only when under unit tests
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index f963d8f..d1eac2d 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -596,6 +596,28 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     }
   }
 
+  @Test
+  def testDescribeAtMinIsrPartitions(): Unit = {
+    val configMap = new java.util.HashMap[String, String]()
+    configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
+
+    adminClient.createTopics(
+      Collections.singletonList(new NewTopic(testTopicName, 1, 6).configs(configMap))).all().get()
+    waitForTopicCreated(testTopicName)
+
+    try {
+      killBroker(0)
+      killBroker(1)
+      val output = TestUtils.grabConsoleOutput(
+        topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions"))))
+      val rows = output.split("\n")
+      assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
+      assertEquals(1, rows.length);
+    } finally {
+      restartDeadBrokers()
+    }
+  }
+
   /**
     * Test describe --under-min-isr-partitions option with four topics:
     *   (1) topic with partition under the configured min ISR count
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index ba53cd0..769cc67 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1035,4 +1035,25 @@ class PartitionTest {
     builder.build()
   }
 
+  /**
+    * Test for AtMinIsr partition state. We set the partition replica set size as 3, but only set one replica as an ISR.
+    * As the default minIsr configuration is 1, then the partition should be at min ISR (isAtMinIsr = true).
+    */
+  @Test
+  def testAtMinIsr(): Unit = {
+    val controllerEpoch = 3
+    val leader = brokerId
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val controllerId = brokerId + 3
+    val replicas = List[Integer](leader, follower1, follower2).asJava
+    val isr = List[Integer](leader).asJava
+    val leaderEpoch = 8
+
+    val partition = Partition(topicPartition, time, replicaManager)
+    assertFalse(partition.isAtMinIsr)
+    // Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1)
+    partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)
+    assertTrue(partition.isAtMinIsr)
+  }
 }
diff --git a/docs/ops.html b/docs/ops.html
index 5ba9def..e14231c 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -866,6 +866,11 @@
         <td>0</td>
       </tr>
       <tr>
+        <td># of at minIsr partitions (|ISR| = min.insync.replicas)</td>
+        <td>kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount</td>
+        <td>0</td>
+      </tr>
+      <tr>
         <td># of offline log directories</td>
         <td>kafka.log:type=LogManager,name=OfflineLogDirectoryCount</td>
         <td>0</td>