You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/17 21:27:00 UTC
kafka git commit: KAFKA-4743;
[KIP-122] Add Reset Consumer Group Offsets tooling
Repository: kafka
Updated Branches:
refs/heads/trunk 6910baf54 -> 2181ae768
KAFKA-4743; [KIP-122] Add Reset Consumer Group Offsets tooling
Author: Jorge Quilcate <qu...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #2624 from jeqo/feature/rewind-consumer-group-offset
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2181ae76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2181ae76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2181ae76
Branch: refs/heads/trunk
Commit: 2181ae768719a9ae3a929ba875faa89c67edf643
Parents: 6910baf
Author: Jorge Quilcate <qu...@gmail.com>
Authored: Wed May 17 14:24:27 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 17 14:24:40 2017 -0700
----------------------------------------------------------------------
.../kafka/admin/ConsumerGroupCommand.scala | 302 +++++++++-
.../admin/ResetConsumerGroupOffsetTest.scala | 601 +++++++++++++++++++
2 files changed, 874 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2181ae76/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index dd7a477..69f0d8a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -17,7 +17,9 @@
package kafka.admin
-import java.util.Properties
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties}
+import javax.xml.datatype.DatatypeFactory
import joptsimple.{OptionParser, OptionSpec}
import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
@@ -27,7 +29,7 @@ import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.errors.BrokerNotAvailableException
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import org.apache.kafka.common.internals.Topic
@@ -38,7 +40,7 @@ import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
-import scala.collection.{Set, mutable}
+import scala.collection.{Seq, Set, mutable}
object ConsumerGroupCommand extends Logging {
@@ -46,12 +48,12 @@ object ConsumerGroupCommand extends Logging {
val opts = new ConsumerGroupCommandOptions(args)
if (args.length == 0)
- CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
+ CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
// should have exactly one action
- val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
+ val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has _)
if (actions != 1)
- CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
+ CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offset")
opts.checkArgs()
@@ -102,6 +104,15 @@ object ConsumerGroupCommand extends Logging {
case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
}
}
+ else if (opts.options.has(opts.resetOffsetsOpt)) {
+ val offsetsToReset = consumerGroupService.resetOffsets()
+ val export = opts.options.has(opts.exportOpt)
+ if (export) {
+ val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset)
+ println(exported)
+ } else
+ printOffsetsToReset(offsetsToReset)
+ }
} catch {
case e: Throwable =>
printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
@@ -134,6 +145,20 @@ object ConsumerGroupCommand extends Logging {
}
}
+ def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
+ print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
+ println()
+
+ groupAssignmentsToReset.foreach {
+ case (consumerAssignment, offsetAndMetadata) =>
+ print("%-30s %-10s %-15s".format(
+ consumerAssignment.topic(),
+ consumerAssignment.partition(),
+ offsetAndMetadata.offset()))
+ println()
+ }
+ }
+
protected case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String],
partition: Option[Int], offset: Option[Long], lag: Option[Long],
consumerId: Option[String], host: Option[String],
@@ -151,7 +176,7 @@ object ConsumerGroupCommand extends Logging {
protected def opts: ConsumerGroupCommandOptions
- protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult
+ protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult
protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]])
@@ -195,12 +220,16 @@ object ConsumerGroupCommand extends Logging {
clientIdOpt, logEndOffsetOpt)
getLogEndOffset(new TopicPartition(topic, partition)) match {
- case LogEndOffsetResult.LogEndOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset))
- case LogEndOffsetResult.Unknown => getDescribePartitionResult(None)
- case LogEndOffsetResult.Ignore => null
+ case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset))
+ case LogOffsetResult.Unknown => getDescribePartitionResult(None)
+ case LogOffsetResult.Ignore => null
}
}
+
+ def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException
+
+ def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException
}
class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
@@ -278,20 +307,20 @@ object ConsumerGroupCommand extends Logging {
}
}
- protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = {
+ protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match {
- case Some(-1) => LogEndOffsetResult.Unknown
+ case Some(-1) => LogOffsetResult.Unknown
case Some(brokerId) =>
getZkConsumer(brokerId).map { consumer =>
val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
consumer.close()
- LogEndOffsetResult.LogEndOffset(logEndOffset)
- }.getOrElse(LogEndOffsetResult.Ignore)
+ LogOffsetResult.LogOffset(logEndOffset)
+ }.getOrElse(LogOffsetResult.Ignore)
case None =>
printError(s"No broker for partition '$topicPartition'")
- LogEndOffsetResult.Ignore
+ LogOffsetResult.Ignore
}
}
@@ -380,7 +409,6 @@ object ConsumerGroupCommand extends Logging {
None
}
}
-
}
class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
@@ -434,12 +462,29 @@ object ConsumerGroupCommand extends Logging {
)
}
- protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = {
+ protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
+ val consumer = getConsumer()
+ val offsets = consumer.endOffsets(List(topicPartition).asJava)
+ val logStartOffset = offsets.get(topicPartition)
+ LogOffsetResult.LogOffset(logStartOffset)
+ }
+
+ protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = {
+ val consumer = getConsumer()
+ val offsets = consumer.beginningOffsets(List(topicPartition).asJava)
+ val logStartOffset = offsets.get(topicPartition)
+ LogOffsetResult.LogOffset(logStartOffset)
+ }
+
+ protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long): LogOffsetResult = {
val consumer = getConsumer()
consumer.assign(List(topicPartition).asJava)
- consumer.seekToEnd(List(topicPartition).asJava)
- val logEndOffset = consumer.position(topicPartition)
- LogEndOffsetResult.LogEndOffset(logEndOffset)
+ val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava)
+ if (offsetsForTimes != null && !offsetsForTimes.isEmpty)
+ LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset)
+ else {
+ getLogEndOffset(topicPartition)
+ }
}
def close() {
@@ -474,14 +519,160 @@ object ConsumerGroupCommand extends Logging {
new KafkaConsumer(properties)
}
+ override def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = {
+ val groupId = opts.options.valueOf(opts.groupOpt)
+ val consumerGroupSummary = adminClient.describeConsumerGroup(groupId, opts.options.valueOf(opts.timeoutMsOpt))
+ consumerGroupSummary.state match {
+ case "Empty" =>
+ val partitionsToReset = getPartitionsToReset(groupId)
+ val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
+ val execute = opts.options.has(opts.executeOpt)
+ if (execute)
+ getConsumer().commitSync(preparedOffsets.asJava)
+ preparedOffsets
+ case currentState =>
+ printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
+ Map.empty
+ }
+ }
+
+ private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Iterable[TopicPartition] = topicArgs.flatMap {
+ case topicArg if topicArg.contains(":") =>
+ val topicAndPartitions = topicArg.split(":")
+ val topic = topicAndPartitions(0)
+ topicAndPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt))
+ case topic => getConsumer().partitionsFor(topic).asScala
+ .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
+ }
+
+ private def getPartitionsToReset(groupId: String): Iterable[TopicPartition] = {
+ if (opts.options.has(opts.allTopicsOpt)) {
+ adminClient.listGroupOffsets(groupId).keys
+ } else if (opts.options.has(opts.topicOpt)) {
+ val topics = opts.options.valuesOf(opts.topicOpt).asScala
+ parseTopicPartitionsToReset(topics)
+ } else {
+ CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
+ }
+ }
+
+ private def parseResetPlan(resetPlanCsv: String): Map[TopicPartition, OffsetAndMetadata] = {
+ resetPlanCsv.split("\n")
+ .map { line =>
+ val Array(topic, partition, offset) = line.split(",").map(_.trim)
+ val topicPartition = new TopicPartition(topic, partition.asInstanceOf[Int])
+ val offsetAndMetadata = new OffsetAndMetadata(offset.asInstanceOf[Long])
+ (topicPartition, offsetAndMetadata)
+ }.toMap
+ }
+
+ private def prepareOffsetsToReset(groupId: String, partitionsToReset: Iterable[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
+ if (opts.options.has(opts.resetToOffsetOpt)) {
+ val offset = opts.options.valueOf(opts.resetToOffsetOpt)
+ partitionsToReset.map {
+ topicPartition => (topicPartition, new OffsetAndMetadata(offset))
+ }.toMap
+ } else if (opts.options.has(opts.resetToEarliestOpt)) {
+ partitionsToReset.map { topicPartition =>
+ getLogStartOffset(topicPartition) match {
+ case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+ case _ => null
+ }
+ }.toMap
+ } else if (opts.options.has(opts.resetToLatestOpt)) {
+ partitionsToReset.map { topicPartition =>
+ getLogEndOffset(topicPartition) match {
+ case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+ case _ => null
+ }
+ }.toMap
+ } else if (opts.options.has(opts.resetShiftByOpt)) {
+ val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
+ partitionsToReset.map { topicPartition =>
+ val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
+ val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
+ throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset"))
+
+ val shiftedOffset = currentOffset + shiftBy
+ val newOffset = getLogEndOffset(topicPartition) match {
+ case LogOffsetResult.LogOffset(endOffset) if shiftedOffset > endOffset =>
+ warn(s"New offset ($shiftedOffset) is higher than latest offset. Value will be set to $endOffset")
+ endOffset
+
+ case _ => getLogStartOffset(topicPartition) match {
+ case LogOffsetResult.LogOffset(startOffset) if shiftedOffset < startOffset =>
+ warn(s"New offset ($shiftedOffset) is lower than earliest offset. Value will be set to $startOffset")
+ startOffset
+
+ case _ => shiftedOffset
+ }
+ }
+ (topicPartition, new OffsetAndMetadata(newOffset))
+ }.toMap
+ } else if (opts.options.has(opts.resetToDatetimeOpt)) {
+ partitionsToReset.map { topicPartition =>
+ val timestamp = getDateTime
+ val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+ logTimestampOffset match {
+ case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+ case _ => null
+ }
+ }.toMap
+ } else if (opts.options.has(opts.resetByDurationOpt)) {
+ partitionsToReset.map { topicPartition =>
+ val duration = opts.options.valueOf(opts.resetByDurationOpt)
+ val now = new Date()
+ val durationParsed = DatatypeFactory.newInstance().newDuration(duration)
+ durationParsed.negate().addTo(now)
+ val timestamp = now.getTime
+ val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+ logTimestampOffset match {
+ case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+ case _ => null
+ }
+ }.toMap
+ } else if (opts.options.has(opts.resetFromFileOpt)) {
+ val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
+ val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
+ val resetPlan = parseResetPlan(resetPlanCsv)
+ partitionsToReset.map { topicPartition =>
+ if (resetPlan.keySet.contains(topicPartition))
+ (topicPartition, resetPlan(topicPartition))
+ else null
+ }.toMap
+ } else {
+ val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
+ partitionsToReset.map { topicPartition =>
+ currentCommittedOffsets.get(topicPartition).map { offset =>
+ (topicPartition, new OffsetAndMetadata(offset))
+ }.orNull
+ }.toMap
+ }
+ }
+
+ private def getDateTime: java.lang.Long = {
+ val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) match {
+ case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
+ case ts => s"${ts}Z"
+ }
+ val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")
+ val date = format.parse(datetime)
+ date.getTime
+ }
+
+ override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = {
+ val rows = assignmentsToReset.map { case (k,v) => s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): List[String]
+ rows.foldRight("")(_ + "\n" + _)
+ }
+
}
- sealed trait LogEndOffsetResult
+ sealed trait LogOffsetResult
- object LogEndOffsetResult {
- case class LogEndOffset(value: Long) extends LogEndOffsetResult
- case object Unknown extends LogEndOffsetResult
- case object Ignore extends LogEndOffsetResult
+ object LogOffsetResult {
+ case class LogOffset(value: Long) extends LogOffsetResult
+ case object Unknown extends LogOffsetResult
+ case object Ignore extends LogOffsetResult
}
class ConsumerGroupCommandOptions(args: Array[String]) {
@@ -489,7 +680,10 @@ object ConsumerGroupCommand extends Logging {
"Multiple URLS can be given to allow fail-over."
val BootstrapServerDoc = "REQUIRED (unless old consumer is used): The server to connect to."
val GroupDoc = "The consumer group we wish to act on."
- val TopicDoc = "The topic whose consumer group information should be deleted."
+ val TopicDoc = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
+ "In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
+ "Reset-offsets also supports multiple topic inputs."
+ val AllTopicsDoc = "Consider all topics assigned to a group in the `reset-offsets` process."
val ListDoc = "List all consumer groups."
val DescribeDoc = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."
val nl = System.getProperty("line.separator")
@@ -505,6 +699,19 @@ object ConsumerGroupCommand extends Logging {
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " +
"or is going through some changes)."
val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
+ val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl +
+ "Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl +
+ "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file. And by default it resets to current offset." + nl +
+ "To define the scope use: --all-topics or --topic"
+ val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
+ val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets."
+ val ResetToOffsetDoc = "Reset offsets to a specific offset."
+ val ResetFromFileDoc = "Reset offsets to values defined in CSV file."
+ val ResetToDatetimeDoc = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'"
+ val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"
+ val ResetToEarliestDoc = "Reset offsets to earliest offset."
+ val ResetToLatestDoc = "Reset offsets to latest offset."
+ val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative"
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
@@ -523,6 +730,7 @@ object ConsumerGroupCommand extends Logging {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
+ val allTopicsOpt = parser.accepts("all-topics", AllTopicsDoc)
val listOpt = parser.accepts("list", ListDoc)
val describeOpt = parser.accepts("describe", DescribeDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
@@ -536,12 +744,39 @@ object ConsumerGroupCommand extends Logging {
.withRequiredArg
.describedAs("command config property file")
.ofType(classOf[String])
+ val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc)
+ val executeOpt = parser.accepts("execute", ExecuteDoc)
+ val exportOpt = parser.accepts("export", ExportDoc)
+ val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc)
+ .withRequiredArg()
+ .describedAs("offset")
+ .ofType(classOf[Long])
+ val resetFromFileOpt = parser.accepts("from-file", ResetFromFileDoc)
+ .withRequiredArg()
+ .describedAs("path to CSV file")
+ .ofType(classOf[String])
+ val resetToDatetimeOpt = parser.accepts("to-datetime", ResetToDatetimeDoc)
+ .withRequiredArg()
+ .describedAs("datetime")
+ .ofType(classOf[String])
+ val resetByDurationOpt = parser.accepts("by-duration", ResetByDurationDoc)
+ .withRequiredArg()
+ .describedAs("duration")
+ .ofType(classOf[String])
+ val resetToEarliestOpt = parser.accepts("to-earliest", ResetToEarliestDoc)
+ val resetToLatestOpt = parser.accepts("to-latest", ResetToLatestDoc)
+ val resetShiftByOpt = parser.accepts("shift-by", ResetShiftByDoc)
+ .withRequiredArg()
+ .describedAs("number-of-offsets")
+ .ofType(classOf[Long])
val options = parser.parse(args : _*)
val useOldConsumer = options.has(zkConnectOpt)
val describeOptPresent = options.has(describeOpt)
- val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
+ val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
+ val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
+ resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetFromFileOpt)
def checkArgs() {
// check required args
@@ -566,10 +801,19 @@ object ConsumerGroupCommand extends Logging {
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
+ if (options.has(resetOffsetsOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
// check invalid args
- CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2181ae76/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
new file mode 100644
index 0000000..d58231e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package unit.kafka.admin
+
+import java.io.{BufferedWriter, File, FileWriter}
+import java.text.SimpleDateFormat
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.{Calendar, Collections, Date, Properties}
+
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
+import kafka.admin.{AdminUtils, ConsumerGroupCommand}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.{Before, Test}
+
+/**
+ * Test cases by:
+ * - Non-existing consumer group
+ * - One for each scenario, with scope=all-topics
+ * - scope=one topic, scenario=to-earliest
+ * - scope=one topic+partitions, scenario=to-earliest
+ * - scope=topics, scenario=to-earliest
+ * - scope=topics+partitions, scenario=to-earliest
+ * - export/import
+ */
+class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
+
+ val overridingProps = new Properties()
+ val topic1 = "foo1"
+ val topic2 = "foo2"
+ val group = "test.group"
+ val props = new Properties
+
+ /**
+ * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
+ * test and should not reuse previous configurations unless they select their ports randomly when servers are started.
+ */
+ override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+
+ @Before
+ override def setUp() {
+ super.setUp()
+
+ props.setProperty("group.id", group)
+ }
+
+ @Test
+ def testResetOffsetsNotExistingGroup() {
+ new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset == Map.empty
+ }, "Expected to have an empty assignations map.")
+
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToLocalDateTime() {
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
+ val checkpoint = new Date()
+ val calendar = Calendar.getInstance()
+ calendar.add(Calendar.DATE, -1)
+
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+
+ TestUtils.waitUntilTrue(() => {
+ val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+ assignmentsOption match {
+ case Some(assignments) =>
+ val sumOffset = assignments.filter(_.topic.exists(_ == topic1))
+ .filter(_.offset.isDefined)
+ .map(assignment => assignment.offset.get)
+ .foldLeft(0.toLong)(_ + _)
+ sumOffset == 100
+ case _ => false
+ }
+ }, "Expected that consumer group has consumed all messages from topic/partition.")
+
+ executor.shutdown()
+
+ val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute")
+ val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
+ val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand1.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+ }, "Expected the consumer group to reset to when offset was 50.")
+
+ printConsumerGroup()
+
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToZonedDateTime() {
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
+
+ val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")
+ val checkpoint = new Date()
+
+ TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
+
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+
+ TestUtils.waitUntilTrue(() => {
+ val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+ assignmentsOption match {
+ case Some(assignments) =>
+ val sumOffset = (assignments.filter(_.topic.exists(_ == topic1))
+ .filter(_.offset.isDefined)
+ .map(assignment => assignment.offset.get) foldLeft 0.toLong)(_ + _)
+ sumOffset == 100
+ case _ => false
+ }
+ }, "Expected that consumer group has consumed all messages from topic/partition.")
+
+ executor.shutdown()
+
+ val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
+ val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
+ val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand1.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 50 }
+ }, "Expected the consumer group to reset to when offset was 50.")
+
+ printConsumerGroup()
+
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsByDuration() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+ }, "Expected the consumer group to reset to offset 0 (earliest by duration).")
+
+ printConsumerGroup()
+
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsByDurationToEarliest() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 100 }
+ }, "Expected the consumer group to reset to offset 100 (latest by duration).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToEarliest() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+ }, "Expected the consumer group to reset to offset 0 (earliest).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToLatest() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 })
+ }, "Expected the consumer group to reset to offset 200 (latest).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToCurrentOffset() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 100 })
+ }, "Expected the consumer group to reset to offset 100 (current).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int = 1, topic: String, totalMessages: Int) {
+ TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
+ val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic)
+
+
+ TestUtils.waitUntilTrue(() => {
+ val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+ assignmentsOption match {
+ case Some(assignments) =>
+ val sumOffset = assignments.filter(_.topic.exists(_ == topic))
+ .filter(_.offset.isDefined)
+ .map(assignment => assignment.offset.get)
+ .foldLeft(0.toLong)(_ + _)
+ sumOffset == totalMessages
+ case _ => false
+ }
+ }, "Expected the consumer group to consume all messages from topic.")
+
+ executor.shutdown()
+ }
+
+ @Test
+ def testResetOffsetsToSpecificOffset() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 1 })
+ }, "Expected the consumer group to reset to offset 1 (specific offset).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsShiftPlus() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 150 })
+ }, "Expected the consumer group to reset to offset 150 (current + 50).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsShiftMinus() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 })
+ }, "Expected the consumer group to reset to offset 50 (current - 50).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsShiftByLowerThanEarliest() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 0 })
+ }, "Expected the consumer group to reset to offset 0 (earliest by shift).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsShiftByHigherThanLatest() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 })
+ }, "Expected the consumer group to reset to offset 200 (latest by shift).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToEarliestOnOneTopic() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+ }, "Expected the consumer group to reset to offset 0 (earliest).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToEarliestOnOneTopicAndPartition() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 }
+ }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToEarliestOnTopics() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets",
+ "--group", group,
+ "--topic", topic1,
+ "--topic", topic2,
+ "--to-earliest", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ AdminUtils.createTopic(zkUtils, topic2, 1, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+ produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.topic() == topic1 } &&
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.topic() == topic2 }
+ }, "Expected the consumer group to reset to offset 0 (earliest).")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ AdminUtils.deleteTopic(zkUtils, topic2)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsToEarliestOnTopicsAndPartitions() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets",
+ "--group", group,
+ "--topic", String.format("%s:1", topic1),
+ "--topic", String.format("%s:1", topic2),
+ "--to-earliest", "--execute")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+ AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+ produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100)
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 && assignment._1.topic() == topic1 }
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 && assignment._1.topic() == topic2 }
+ }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ AdminUtils.deleteTopic(zkUtils, topic2)
+ consumerGroupCommand.close()
+ }
+
+ @Test
+ def testResetOffsetsExportImportPlan() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--export")
+ val opts = new ConsumerGroupCommandOptions(cgcArgs)
+ val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+ AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+
+ produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+
+ val file = File.createTempFile("reset", ".csv")
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommand.resetOffsets()
+ val bw = new BufferedWriter(new FileWriter(file))
+ bw.write(consumerGroupCommand.exportOffsetsToReset(assignmentsToReset))
+ bw.close()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } && file.exists()
+ }, "Expected the consume all messages and save reset offsets plan to file")
+
+
+ val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--from-file", file.getCanonicalPath)
+ val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
+ val consumerGroupCommandExec = new KafkaConsumerGroupService(optsExec)
+
+
+ TestUtils.waitUntilTrue(() => {
+ val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
+ assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+ }, "Expected the consumer group to reset to offset 0 (earliest) by file.")
+
+ file.deleteOnExit()
+
+ printConsumerGroup()
+ AdminUtils.deleteTopic(zkUtils, topic1)
+ consumerGroupCommand.close()
+ }
+
+ private def printConsumerGroup() {
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, "--describe")
+ ConsumerGroupCommand.main(cgcArgs)
+ }
+
+}
+
+
+class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable {
+ val props = new Properties
+ props.put("bootstrap.servers", broker)
+ props.put("group.id", groupId)
+ props.put("key.deserializer", classOf[StringDeserializer].getName)
+ props.put("value.deserializer", classOf[StringDeserializer].getName)
+ val consumer = new KafkaConsumer(props)
+
+ def run() {
+ try {
+ consumer.subscribe(Collections.singleton(topic))
+ while (true)
+ consumer.poll(Long.MaxValue)
+ } catch {
+ case _: WakeupException => // OK
+ } finally {
+ consumer.close()
+ }
+ }
+
+ def shutdown() {
+ consumer.wakeup()
+ }
+}
+
+class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
+ val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
+ var consumers: List[ConsumerThread] = List[ConsumerThread]()
+
+ for (i <- 1 to numConsumers) {
+ val consumer = new ConsumerThread(broker, i, groupId, topic)
+ consumers ++= List(consumer)
+ executor.submit(consumer)
+ }
+
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ shutdown()
+ }
+ })
+
+ def shutdown() {
+ consumers.foreach(_.shutdown())
+ executor.shutdown()
+ try {
+ executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
+ } catch {
+ case e: InterruptedException =>
+ e.printStackTrace()
+ }
+ }
+}