You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/04/05 16:15:04 UTC
[kafka] branch trunk updated: KAFKA-7904;
Add AtMinIsr partition metric and TopicCommand option (KIP-427)
This is an automated email from the ASF dual-hosted git repository.
gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 31d191f KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)
31d191f is described below
commit 31d191fc85229c9b0a6e85aad56e8a7975b295e1
Author: Kevin Lu <lu...@berkeley.edu>
AuthorDate: Fri Apr 5 09:14:41 2019 -0700
KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)
- Add `AtMinIsrPartitionCount` metric to `ReplicaManager`
- Add `AtMinIsr` metric to `Partition`
- Add `--at-min-isr-partitions` describe `TopicCommand` option
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103089398
Author: Kevin Lu <lu...@berkeley.edu>
Author: lu.kevin@berkeley.edu <ke...@paypal.com>
Reviewers: Gwen Shapira
Closes #6421 from KevinLiLu/KAFKA-7904
---
core/src/main/scala/kafka/admin/TopicCommand.scala | 28 ++++++++++++++++------
core/src/main/scala/kafka/cluster/Partition.scala | 19 +++++++++++++++
.../main/scala/kafka/server/ReplicaManager.scala | 7 ++++++
.../admin/TopicCommandWithAdminClientTest.scala | 22 +++++++++++++++++
.../scala/unit/kafka/cluster/PartitionTest.scala | 21 ++++++++++++++++
docs/ops.html | 5 ++++
6 files changed, 95 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 0c26fc9..b21989e 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -104,7 +104,7 @@ object TopicCommand extends Logging {
describeConfigs: Boolean)
class DescribeOptions(opts: TopicCommandOptions, liveBrokers: Set[Int]) {
- val describeConfigs: Boolean = !opts.reportUnavailablePartitions && !opts.reportUnderReplicatedPartitions && !opts.reportUnderMinIsrPartitions
+ val describeConfigs: Boolean = !opts.reportUnavailablePartitions && !opts.reportUnderReplicatedPartitions && !opts.reportUnderMinIsrPartitions && !opts.reportAtMinIsrPartitions
val describePartitions: Boolean = !opts.reportOverriddenConfigs
private def hasUnderReplicatedPartitions(partitionDescription: PartitionDescription) = {
partitionDescription.isr.size < partitionDescription.assignedReplicas.size
@@ -121,15 +121,22 @@ object TopicCommand extends Logging {
private def hasUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
partitionDescription.isr.size < partitionDescription.minIsrCount
}
+ private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
+ partitionDescription.isr.size == partitionDescription.minIsrCount
+ }
private def shouldPrintUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
opts.reportUnderMinIsrPartitions && hasUnderMinIsrPartitions(partitionDescription)
}
+ private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
+ opts.reportAtMinIsrPartitions && hasAtMinIsrPartitions(partitionDescription)
+ }
def shouldPrintTopicPartition(partitionDesc: PartitionDescription): Boolean = {
describeConfigs ||
shouldPrintUnderReplicatedPartitions(partitionDesc) ||
shouldPrintUnavailablePartitions(partitionDesc) ||
- shouldPrintUnderMinIsrPartitions(partitionDesc)
+ shouldPrintUnderMinIsrPartitions(partitionDesc) ||
+ shouldPrintAtMinIsrPartitions(partitionDesc)
}
}
@@ -228,7 +235,7 @@ object TopicCommand extends Logging {
}
}
if (describeOptions.describePartitions) {
- val computedMinIsrCount = if (opts.reportUnderMinIsrPartitions)
+ val computedMinIsrCount = if (opts.reportUnderMinIsrPartitions || opts.reportAtMinIsrPartitions)
allConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, td.name())).get().get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt else 0
for (partition <- sortedPartitions) {
val partitionDesc = PartitionDescription(
@@ -550,6 +557,8 @@ object TopicCommand extends Logging {
"if set when describing topics, only show partitions whose leader is not available")
private val reportUnderMinIsrPartitionsOpt = parser.accepts("under-min-isr-partitions",
"if set when describing topics, only show partitions whose isr count is less than the configured minimum. Not supported with the --zookeeper option.")
+ private val reportAtMinIsrPartitionsOpt = parser.accepts("at-min-isr-partitions",
+ "if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.")
private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
"if set when describing topics, only show topics that have overridden configs")
private val ifExistsOpt = parser.accepts("if-exists",
@@ -568,6 +577,8 @@ object TopicCommand extends Logging {
private val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
+ private val allReplicationReportOpts: Set[OptionSpec[_]] = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt)
+
def has(builder: OptionSpec[_]): Boolean = options.has(builder)
def valueAsOption[A](option: OptionSpec[A], defaultValue: Option[A] = None): Option[A] = if (has(option)) Some(options.valueOf(option)) else defaultValue
def valuesAsOption[A](option: OptionSpec[A], defaultValue: Option[util.List[A]] = None): Option[util.List[A]] = if (has(option)) Some(options.valuesOf(option)) else defaultValue
@@ -593,6 +604,7 @@ object TopicCommand extends Logging {
def reportUnderReplicatedPartitions: Boolean = has(reportUnderReplicatedPartitionsOpt)
def reportUnavailablePartitions: Boolean = has(reportUnavailablePartitionsOpt)
def reportUnderMinIsrPartitions: Boolean = has(reportUnderMinIsrPartitionsOpt)
+ def reportAtMinIsrPartitions: Boolean = has(reportAtMinIsrPartitionsOpt)
def reportOverriddenConfigs: Boolean = has(topicsWithOverridesOpt)
def ifExists: Boolean = has(ifExistsOpt)
def ifNotExists: Boolean = has(ifNotExistsOpt)
@@ -636,13 +648,15 @@ object TopicCommand extends Logging {
if(options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
- allTopicLevelOpts -- Set(describeOpt) + reportUnderMinIsrPartitionsOpt + reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
+ allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt,
- allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
+ allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
+ allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
- allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
+ allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
- allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnderMinIsrPartitionsOpt + reportUnavailablePartitionsOpt)
+ allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 698fb48..30ce756 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -127,6 +127,15 @@ class Partition(val topicPartition: TopicPartition,
tags
)
+ newGauge("AtMinIsr",
+ new Gauge[Int] {
+ def value = {
+ if (isAtMinIsr) 1 else 0
+ }
+ },
+ tags
+ )
+
newGauge("ReplicasCount",
new Gauge[Int] {
def value = {
@@ -162,6 +171,15 @@ class Partition(val topicPartition: TopicPartition,
}
}
+ def isAtMinIsr: Boolean = {
+ leaderReplicaIfLocal match {
+ case Some(leaderReplica) =>
+ inSyncReplicas.size == leaderReplica.log.get.config.minInSyncReplicas
+ case None =>
+ false
+ }
+ }
+
/**
* Create the future replica if 1) the current replica is not in the given log directory and 2) the future replica
* does not exist. This method assumes that the current replica has already been created.
@@ -1022,6 +1040,7 @@ class Partition(val topicPartition: TopicPartition,
removeMetric("InSyncReplicasCount", tags)
removeMetric("ReplicasCount", tags)
removeMetric("LastStableOffsetLag", tags)
+ removeMetric("AtMinIsr", tags)
}
override def equals(that: Any): Boolean = that match {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c194bd5..71c5f86 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -241,6 +241,12 @@ class ReplicaManager(val config: KafkaConfig,
def value = leaderPartitionsIterator.count(_.isUnderMinIsr)
}
)
+ val atMinIsrPartitionCount = newGauge(
+ "AtMinIsrPartitionCount",
+ new Gauge[Int] {
+ def value = leaderPartitionsIterator.count(_.isAtMinIsr)
+ }
+ )
val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
@@ -1482,6 +1488,7 @@ class ReplicaManager(val config: KafkaConfig,
removeMetric("OfflineReplicaCount")
removeMetric("UnderReplicatedPartitions")
removeMetric("UnderMinIsrPartitionCount")
+ removeMetric("AtMinIsrPartitionCount")
}
// High watermark do not need to be checkpointed only when under unit tests
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index f963d8f..d1eac2d 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -596,6 +596,28 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
}
}
+ @Test
+ def testDescribeAtMinIsrPartitions(): Unit = {
+ val configMap = new java.util.HashMap[String, String]()
+ configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
+
+ adminClient.createTopics(
+ Collections.singletonList(new NewTopic(testTopicName, 1, 6).configs(configMap))).all().get()
+ waitForTopicCreated(testTopicName)
+
+ try {
+ killBroker(0)
+ killBroker(1)
+ val output = TestUtils.grabConsoleOutput(
+ topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions"))))
+ val rows = output.split("\n")
+ assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
+ assertEquals(1, rows.length);
+ } finally {
+ restartDeadBrokers()
+ }
+ }
+
/**
* Test describe --under-min-isr-partitions option with four topics:
* (1) topic with partition under the configured min ISR count
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index ba53cd0..769cc67 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1035,4 +1035,25 @@ class PartitionTest {
builder.build()
}
+ /**
+ * Test for AtMinIsr partition state. We set the partition replica set size as 3, but only set one replica as an ISR.
+ * As the default minIsr configuration is 1, then the partition should be at min ISR (isAtMinIsr = true).
+ */
+ @Test
+ def testAtMinIsr(): Unit = {
+ val controllerEpoch = 3
+ val leader = brokerId
+ val follower1 = brokerId + 1
+ val follower2 = brokerId + 2
+ val controllerId = brokerId + 3
+ val replicas = List[Integer](leader, follower1, follower2).asJava
+ val isr = List[Integer](leader).asJava
+ val leaderEpoch = 8
+
+ val partition = Partition(topicPartition, time, replicaManager)
+ assertFalse(partition.isAtMinIsr)
+ // Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1)
+ partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)
+ assertTrue(partition.isAtMinIsr)
+ }
}
diff --git a/docs/ops.html b/docs/ops.html
index 5ba9def..e14231c 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -866,6 +866,11 @@
<td>0</td>
</tr>
<tr>
+ <td># of at minIsr partitions (|ISR| = min.insync.replicas)</td>
+ <td>kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount</td>
+ <td>0</td>
+ </tr>
+ <tr>
<td># of offline log directories</td>
<td>kafka.log:type=LogManager,name=OfflineLogDirectoryCount</td>
<td>0</td>