You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 07:52:42 UTC
kafka git commit: KAFKA-5266;
Follow-up improvements for consumer offset reset tool (KIP-122)
Repository: kafka
Updated Branches:
refs/heads/trunk 2cc8f48ae -> ef9551297
KAFKA-5266; Follow-up improvements for consumer offset reset tool (KIP-122)
Implement improvements defined here: https://issues.apache.org/jira/browse/KAFKA-5266
Author: Jorge Quilcate Otoya <qu...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3102 from jeqo/feature/KAFKA-5266
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef955129
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef955129
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef955129
Branch: refs/heads/trunk
Commit: ef9551297c815a0ac3065a65a0831863090714f0
Parents: 2cc8f48
Author: Jorge Quilcate Otoya <qu...@gmail.com>
Authored: Wed May 31 00:42:43 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 31 00:50:48 2017 -0700
----------------------------------------------------------------------
.../kafka/admin/ConsumerGroupCommand.scala | 88 ++++++++++++--------
.../admin/ResetConsumerGroupOffsetTest.scala | 33 ++++++--
2 files changed, 80 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef955129/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index fb589a2..2f26f57 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -106,8 +106,7 @@ object ConsumerGroupCommand extends Logging {
}
else if (opts.options.has(opts.resetOffsetsOpt)) {
val offsetsToReset = consumerGroupService.resetOffsets()
- val export = opts.options.has(opts.exportOpt)
- if (export) {
+ if (opts.options.has(opts.exportOpt)) {
val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset)
println(exported)
} else
@@ -523,7 +522,7 @@ object ConsumerGroupCommand extends Logging {
val groupId = opts.options.valueOf(opts.groupOpt)
val consumerGroupSummary = adminClient.describeConsumerGroup(groupId, opts.options.valueOf(opts.timeoutMsOpt))
consumerGroupSummary.state match {
- case "Empty" =>
+ case "Empty" | "Dead" =>
val partitionsToReset = getPartitionsToReset(groupId)
val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
val execute = opts.options.has(opts.executeOpt)
@@ -536,7 +535,7 @@ object ConsumerGroupCommand extends Logging {
}
}
- private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Iterable[TopicPartition] = topicArgs.flatMap {
+ private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
case topicArg if topicArg.contains(":") =>
val topicAndPartitions = topicArg.split(":")
val topic = topicAndPartitions(0)
@@ -545,14 +544,18 @@ object ConsumerGroupCommand extends Logging {
.map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
}
- private def getPartitionsToReset(groupId: String): Iterable[TopicPartition] = {
+ private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
if (opts.options.has(opts.allTopicsOpt)) {
- adminClient.listGroupOffsets(groupId).keys
+ val allTopicPartitions = adminClient.listGroupOffsets(groupId).keys.toSeq
+ allTopicPartitions
} else if (opts.options.has(opts.topicOpt)) {
val topics = opts.options.valuesOf(opts.topicOpt).asScala
parseTopicPartitionsToReset(topics)
} else {
- CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
+ if (opts.options.has(opts.resetFromFileOpt))
+ Nil
+ else
+ CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
}
}
@@ -570,20 +573,22 @@ object ConsumerGroupCommand extends Logging {
if (opts.options.has(opts.resetToOffsetOpt)) {
val offset = opts.options.valueOf(opts.resetToOffsetOpt)
partitionsToReset.map {
- topicPartition => (topicPartition, new OffsetAndMetadata(offset))
+ topicPartition =>
+ val newOffset: Long = checkOffsetRange(topicPartition, offset)
+ (topicPartition, new OffsetAndMetadata(newOffset))
}.toMap
} else if (opts.options.has(opts.resetToEarliestOpt)) {
partitionsToReset.map { topicPartition =>
getLogStartOffset(topicPartition) match {
case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
- case _ => null
+ case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetToLatestOpt)) {
partitionsToReset.map { topicPartition =>
getLogEndOffset(topicPartition) match {
case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
- case _ => null
+ case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetShiftByOpt)) {
@@ -592,21 +597,8 @@ object ConsumerGroupCommand extends Logging {
val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset"))
-
val shiftedOffset = currentOffset + shiftBy
- val newOffset = getLogEndOffset(topicPartition) match {
- case LogOffsetResult.LogOffset(endOffset) if shiftedOffset > endOffset =>
- warn(s"New offset ($shiftedOffset) is higher than latest offset. Value will be set to $endOffset")
- endOffset
-
- case _ => getLogStartOffset(topicPartition) match {
- case LogOffsetResult.LogOffset(startOffset) if shiftedOffset < startOffset =>
- warn(s"New offset ($shiftedOffset) is lower than earliest offset. Value will be set to $startOffset")
- startOffset
-
- case _ => shiftedOffset
- }
- }
+ val newOffset: Long = checkOffsetRange(topicPartition, shiftedOffset)
(topicPartition, new OffsetAndMetadata(newOffset))
}.toMap
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
@@ -615,7 +607,7 @@ object ConsumerGroupCommand extends Logging {
val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
logTimestampOffset match {
case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
- case _ => null
+ case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetByDurationOpt)) {
@@ -628,25 +620,47 @@ object ConsumerGroupCommand extends Logging {
val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
logTimestampOffset match {
case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
- case _ => null
+ case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetFromFileOpt)) {
val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
val resetPlan = parseResetPlan(resetPlanCsv)
- partitionsToReset.map { topicPartition =>
- if (resetPlan.keySet.contains(topicPartition))
- (topicPartition, resetPlan(topicPartition))
- else null
+ resetPlan.keySet.map { topicPartition =>
+ val newOffset: Long = checkOffsetRange(topicPartition, resetPlan(topicPartition).offset())
+ (topicPartition, new OffsetAndMetadata(newOffset))
}.toMap
- } else {
+ } else if (opts.options.has(opts.resetToCurrentOpt)) {
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
partitionsToReset.map { topicPartition =>
currentCommittedOffsets.get(topicPartition).map { offset =>
(topicPartition, new OffsetAndMetadata(offset))
- }.orNull
+ }.getOrElse(
+ getLogEndOffset(topicPartition) match {
+ case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+ case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
+ }
+ )
}.toMap
+ } else {
+ CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
+ }
+ }
+
+ private def checkOffsetRange(topicPartition: TopicPartition, offset: Long) = {
+ getLogEndOffset(topicPartition) match {
+ case LogOffsetResult.LogOffset(endOffset) if offset > endOffset =>
+ warn(s"New offset ($offset) is higher than latest offset. Value will be set to $endOffset")
+ endOffset
+
+ case _ => getLogStartOffset(topicPartition) match {
+ case LogOffsetResult.LogOffset(startOffset) if offset < startOffset =>
+ warn(s"New offset ($offset) is lower than earliest offset. Value will be set to $startOffset")
+ startOffset
+
+ case _ => offset
+ }
}
}
@@ -701,8 +715,8 @@ object ConsumerGroupCommand extends Logging {
val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl +
"Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl +
- "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file. And by default it resets to current offset." + nl +
- "To define the scope use: --all-topics or --topic"
+ "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario must be choose" + nl +
+ "To define the scope use: --all-topics or --topic. . One scope must be choose, unless you use '--from-file' scenario"
val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets."
val ResetToOffsetDoc = "Reset offsets to a specific offset."
@@ -711,6 +725,7 @@ object ConsumerGroupCommand extends Logging {
val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"
val ResetToEarliestDoc = "Reset offsets to earliest offset."
val ResetToLatestDoc = "Reset offsets to latest offset."
+ val ResetToCurrentDoc = "Reset offsets to current offset."
val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative"
val parser = new OptionParser(false)
@@ -765,6 +780,7 @@ object ConsumerGroupCommand extends Logging {
.ofType(classOf[String])
val resetToEarliestOpt = parser.accepts("to-earliest", ResetToEarliestDoc)
val resetToLatestOpt = parser.accepts("to-latest", ResetToLatestDoc)
+ val resetToCurrentOpt = parser.accepts("to-current", ResetToCurrentDoc)
val resetShiftByOpt = parser.accepts("shift-by", ResetShiftByDoc)
.withRequiredArg()
.describedAs("number-of-offsets")
@@ -776,7 +792,7 @@ object ConsumerGroupCommand extends Logging {
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
- resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetFromFileOpt)
+ resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)
def checkArgs() {
// check required args
@@ -808,9 +824,11 @@ object ConsumerGroupCommand extends Logging {
CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
+
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef955129/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 67d03e9..22958a9 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -75,7 +75,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
def testResetOffsetsNotExistingGroup() {
createConsumerGroupExecutor(brokerList, 1, group, topic1)
- val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics")
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics", "--to-current")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@@ -87,15 +87,33 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}
@Test
+ def testResetOffsetsNewConsumerExistingTopic(): Unit = {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 })
+ }, "Expected the consumer group to reset to offset 1 (specific offset).")
+
+ printConsumerGroup("new.group")
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
def testResetOffsetsToLocalDateTime() {
AdminUtils.createTopic(zkUtils, topic1, 1, 1)
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
- val checkpoint = new Date()
val calendar = Calendar.getInstance()
calendar.add(Calendar.DATE, -1)
-
TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
@@ -259,7 +277,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
@Test
def testResetOffsetsToCurrentOffset() {
- val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--execute")
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
@@ -269,7 +287,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
TestUtils.waitUntilTrue(() => {
val assignmentsToReset = consumerGroupCommand.resetOffsets()
assignmentsToReset.exists({ assignment => assignment._2.offset() == 100 })
@@ -283,7 +300,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic)
-
TestUtils.waitUntilTrue(() => {
val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
assignmentsOption match {
@@ -538,6 +554,11 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
ConsumerGroupCommand.main(cgcArgs)
}
+ private def printConsumerGroup(group: String) {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, "--describe")
+ ConsumerGroupCommand.main(cgcArgs)
+ }
+
private def createConsumerGroupExecutor(brokerList: String, numConsumers: Int, groupId: String, topic: String): ConsumerGroupExecutor = {
val executor = new ConsumerGroupExecutor(brokerList, numConsumers, groupId, topic)
executors += executor