You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 07:52:42 UTC

kafka git commit: KAFKA-5266; Follow-up improvements for consumer offset reset tool (KIP-122)

Repository: kafka
Updated Branches:
  refs/heads/trunk 2cc8f48ae -> ef9551297


KAFKA-5266; Follow-up improvements for consumer offset reset tool (KIP-122)

Implement improvements defined here: https://issues.apache.org/jira/browse/KAFKA-5266

Author: Jorge Quilcate Otoya <qu...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3102 from jeqo/feature/KAFKA-5266


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef955129
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef955129
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef955129

Branch: refs/heads/trunk
Commit: ef9551297c815a0ac3065a65a0831863090714f0
Parents: 2cc8f48
Author: Jorge Quilcate Otoya <qu...@gmail.com>
Authored: Wed May 31 00:42:43 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 31 00:50:48 2017 -0700

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 88 ++++++++++++--------
 .../admin/ResetConsumerGroupOffsetTest.scala    | 33 ++++++--
 2 files changed, 80 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef955129/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index fb589a2..2f26f57 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -106,8 +106,7 @@ object ConsumerGroupCommand extends Logging {
       }
       else if (opts.options.has(opts.resetOffsetsOpt)) {
         val offsetsToReset = consumerGroupService.resetOffsets()
-        val export = opts.options.has(opts.exportOpt)
-        if (export) {
+        if (opts.options.has(opts.exportOpt)) {
           val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset)
           println(exported)
         } else
@@ -523,7 +522,7 @@ object ConsumerGroupCommand extends Logging {
       val groupId = opts.options.valueOf(opts.groupOpt)
       val consumerGroupSummary = adminClient.describeConsumerGroup(groupId, opts.options.valueOf(opts.timeoutMsOpt))
       consumerGroupSummary.state match {
-        case "Empty" =>
+        case "Empty" | "Dead" =>
           val partitionsToReset = getPartitionsToReset(groupId)
           val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
           val execute = opts.options.has(opts.executeOpt)
@@ -536,7 +535,7 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Iterable[TopicPartition] = topicArgs.flatMap {
+    private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
       case topicArg if topicArg.contains(":") =>
         val topicAndPartitions = topicArg.split(":")
         val topic = topicAndPartitions(0)
@@ -545,14 +544,18 @@ object ConsumerGroupCommand extends Logging {
         .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
     }
 
-    private def getPartitionsToReset(groupId: String): Iterable[TopicPartition] = {
+    private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
       if (opts.options.has(opts.allTopicsOpt)) {
-        adminClient.listGroupOffsets(groupId).keys
+        val allTopicPartitions = adminClient.listGroupOffsets(groupId).keys.toSeq
+        allTopicPartitions
       } else if (opts.options.has(opts.topicOpt)) {
         val topics = opts.options.valuesOf(opts.topicOpt).asScala
         parseTopicPartitionsToReset(topics)
       } else {
-        CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
+        if (opts.options.has(opts.resetFromFileOpt))
+          Nil
+        else
+          CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
       }
     }
 
@@ -570,20 +573,22 @@ object ConsumerGroupCommand extends Logging {
       if (opts.options.has(opts.resetToOffsetOpt)) {
         val offset = opts.options.valueOf(opts.resetToOffsetOpt)
         partitionsToReset.map {
-          topicPartition => (topicPartition, new OffsetAndMetadata(offset))
+          topicPartition =>
+            val newOffset: Long = checkOffsetRange(topicPartition, offset)
+            (topicPartition, new OffsetAndMetadata(newOffset))
         }.toMap
       } else if (opts.options.has(opts.resetToEarliestOpt)) {
         partitionsToReset.map { topicPartition =>
           getLogStartOffset(topicPartition) match {
             case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
-            case _ => null
+            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetToLatestOpt)) {
         partitionsToReset.map { topicPartition =>
           getLogEndOffset(topicPartition) match {
             case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
-            case _ => null
+            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetShiftByOpt)) {
@@ -592,21 +597,8 @@ object ConsumerGroupCommand extends Logging {
           val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
           val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
             throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset"))
-
           val shiftedOffset = currentOffset + shiftBy
-          val newOffset = getLogEndOffset(topicPartition) match {
-            case LogOffsetResult.LogOffset(endOffset) if shiftedOffset > endOffset =>
-              warn(s"New offset ($shiftedOffset) is higher than latest offset. Value will be set to $endOffset")
-              endOffset
-
-            case _ => getLogStartOffset(topicPartition) match {
-              case LogOffsetResult.LogOffset(startOffset) if shiftedOffset < startOffset =>
-                warn(s"New offset ($shiftedOffset) is lower than earliest offset. Value will be set to $startOffset")
-                startOffset
-
-              case _ => shiftedOffset
-            }
-          }
+          val newOffset: Long = checkOffsetRange(topicPartition, shiftedOffset)
           (topicPartition, new OffsetAndMetadata(newOffset))
         }.toMap
       } else if (opts.options.has(opts.resetToDatetimeOpt)) {
@@ -615,7 +607,7 @@ object ConsumerGroupCommand extends Logging {
           val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
           logTimestampOffset match {
             case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
-            case _ => null
+            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetByDurationOpt)) {
@@ -628,25 +620,47 @@ object ConsumerGroupCommand extends Logging {
           val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
           logTimestampOffset match {
             case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
-            case _ => null
+            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetFromFileOpt)) {
         val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
         val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
         val resetPlan = parseResetPlan(resetPlanCsv)
-        partitionsToReset.map { topicPartition =>
-            if (resetPlan.keySet.contains(topicPartition))
-              (topicPartition, resetPlan(topicPartition))
-            else null
+        resetPlan.keySet.map { topicPartition =>
+          val newOffset: Long = checkOffsetRange(topicPartition, resetPlan(topicPartition).offset())
+          (topicPartition, new OffsetAndMetadata(newOffset))
         }.toMap
-      } else {
+      } else if (opts.options.has(opts.resetToCurrentOpt)) {
         val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
         partitionsToReset.map { topicPartition =>
           currentCommittedOffsets.get(topicPartition).map { offset =>
             (topicPartition, new OffsetAndMetadata(offset))
-          }.orNull
+          }.getOrElse(
+            getLogEndOffset(topicPartition) match {
+              case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+              case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
+            }
+          )
         }.toMap
+      } else {
+        CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
+      }
+    }
+
+    private def checkOffsetRange(topicPartition: TopicPartition, offset: Long) = {
+      getLogEndOffset(topicPartition) match {
+        case LogOffsetResult.LogOffset(endOffset) if offset > endOffset =>
+          warn(s"New offset ($offset) is higher than latest offset. Value will be set to $endOffset")
+          endOffset
+
+        case _ => getLogStartOffset(topicPartition) match {
+          case LogOffsetResult.LogOffset(startOffset) if offset < startOffset =>
+            warn(s"New offset ($offset) is lower than earliest offset. Value will be set to $startOffset")
+            startOffset
+
+          case _ => offset
+        }
       }
     }
 
@@ -701,8 +715,8 @@ object ConsumerGroupCommand extends Logging {
     val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
     val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl +
       "Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl +
-      "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file. And by default it resets to current offset." + nl +
-      "To define the scope use: --all-topics or --topic"
+      "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario must be choose" + nl +
+      "To define the scope use: --all-topics or --topic. . One scope must be choose, unless you use '--from-file' scenario"
     val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
     val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets."
     val ResetToOffsetDoc = "Reset offsets to a specific offset."
@@ -711,6 +725,7 @@ object ConsumerGroupCommand extends Logging {
     val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"
     val ResetToEarliestDoc = "Reset offsets to earliest offset."
     val ResetToLatestDoc = "Reset offsets to latest offset."
+    val ResetToCurrentDoc = "Reset offsets to current offset."
     val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative"
 
     val parser = new OptionParser(false)
@@ -765,6 +780,7 @@ object ConsumerGroupCommand extends Logging {
                                    .ofType(classOf[String])
     val resetToEarliestOpt = parser.accepts("to-earliest", ResetToEarliestDoc)
     val resetToLatestOpt = parser.accepts("to-latest", ResetToLatestDoc)
+    val resetToCurrentOpt = parser.accepts("to-current", ResetToCurrentDoc)
     val resetShiftByOpt = parser.accepts("shift-by", ResetShiftByDoc)
                              .withRequiredArg()
                              .describedAs("number-of-offsets")
@@ -776,7 +792,7 @@ object ConsumerGroupCommand extends Logging {
 
     val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
     val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
-      resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetFromFileOpt)
+      resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)
 
     def checkArgs() {
       // check required args
@@ -808,9 +824,11 @@ object ConsumerGroupCommand extends Logging {
         CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
 
+
       // check invalid args
       CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef955129/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 67d03e9..22958a9 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -75,7 +75,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   def testResetOffsetsNotExistingGroup() {
     createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics", "--to-current")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -87,15 +87,33 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   }
 
   @Test
+  def testResetOffsetsNewConsumerExistingTopic(): Unit = {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 })
+    }, "Expected the consumer group to reset to offset 1 (specific offset).")
+
+    printConsumerGroup("new.group")
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
   def testResetOffsetsToLocalDateTime() {
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
-    val checkpoint = new Date()
     val calendar = Calendar.getInstance()
     calendar.add(Calendar.DATE, -1)
 
-
     TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
@@ -259,7 +277,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsToCurrentOffset() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -269,7 +287,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
 
-
     TestUtils.waitUntilTrue(() => {
       val assignmentsToReset = consumerGroupCommand.resetOffsets()
       assignmentsToReset.exists({ assignment => assignment._2.offset() == 100 })
@@ -283,7 +300,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
     val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic)
 
-
     TestUtils.waitUntilTrue(() => {
       val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
       assignmentsOption match {
@@ -538,6 +554,11 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     ConsumerGroupCommand.main(cgcArgs)
   }
 
+  private def printConsumerGroup(group: String) {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, "--describe")
+    ConsumerGroupCommand.main(cgcArgs)
+  }
+
   private def createConsumerGroupExecutor(brokerList: String, numConsumers: Int, groupId: String, topic: String): ConsumerGroupExecutor = {
     val executor = new ConsumerGroupExecutor(brokerList, numConsumers, groupId, topic)
     executors += executor