You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/17 21:27:00 UTC

kafka git commit: KAFKA-4743; [KIP-122] Add Reset Consumer Group Offsets tooling

Repository: kafka
Updated Branches:
  refs/heads/trunk 6910baf54 -> 2181ae768


KAFKA-4743; [KIP-122] Add Reset Consumer Group Offsets tooling

Author: Jorge Quilcate <qu...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2624 from jeqo/feature/rewind-consumer-group-offset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2181ae76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2181ae76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2181ae76

Branch: refs/heads/trunk
Commit: 2181ae768719a9ae3a929ba875faa89c67edf643
Parents: 6910baf
Author: Jorge Quilcate <qu...@gmail.com>
Authored: Wed May 17 14:24:27 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 17 14:24:40 2017 -0700

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 302 +++++++++-
 .../admin/ResetConsumerGroupOffsetTest.scala    | 601 +++++++++++++++++++
 2 files changed, 874 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2181ae76/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index dd7a477..69f0d8a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -17,7 +17,9 @@
 
 package kafka.admin
 
-import java.util.Properties
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties}
+import javax.xml.datatype.DatatypeFactory
 
 import joptsimple.{OptionParser, OptionSpec}
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
@@ -27,7 +29,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
 import org.apache.kafka.common.errors.BrokerNotAvailableException
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 import org.apache.kafka.common.internals.Topic
@@ -38,7 +40,7 @@ import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
-import scala.collection.{Set, mutable}
+import scala.collection.{Seq, Set, mutable}
 
 object ConsumerGroupCommand extends Logging {
 
@@ -46,12 +48,12 @@ object ConsumerGroupCommand extends Logging {
     val opts = new ConsumerGroupCommandOptions(args)
 
     if (args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
+      CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
 
     // should have exactly one action
-    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
+    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has _)
     if (actions != 1)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
+      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offset")
 
     opts.checkArgs()
 
@@ -102,6 +104,15 @@ object ConsumerGroupCommand extends Logging {
           case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
         }
       }
+      else if (opts.options.has(opts.resetOffsetsOpt)) {
+        val offsetsToReset = consumerGroupService.resetOffsets()
+        val export = opts.options.has(opts.exportOpt)
+        if (export) {
+          val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset)
+          println(exported)
+        } else
+          printOffsetsToReset(offsetsToReset)
+      }
     } catch {
       case e: Throwable =>
         printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
@@ -134,6 +145,20 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
+  def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
+    print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
+    println()
+
+    groupAssignmentsToReset.foreach {
+      case (consumerAssignment, offsetAndMetadata) =>
+        print("%-30s %-10s %-15s".format(
+          consumerAssignment.topic(),
+          consumerAssignment.partition(),
+          offsetAndMetadata.offset()))
+        println()
+    }
+  }
+
   protected case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String],
                                                 partition: Option[Int], offset: Option[Long], lag: Option[Long],
                                                 consumerId: Option[String], host: Option[String],
@@ -151,7 +176,7 @@ object ConsumerGroupCommand extends Logging {
 
     protected def opts: ConsumerGroupCommandOptions
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult
 
     protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]])
 
@@ -195,12 +220,16 @@ object ConsumerGroupCommand extends Logging {
                                  clientIdOpt, logEndOffsetOpt)
 
       getLogEndOffset(new TopicPartition(topic, partition)) match {
-        case LogEndOffsetResult.LogEndOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset))
-        case LogEndOffsetResult.Unknown => getDescribePartitionResult(None)
-        case LogEndOffsetResult.Ignore => null
+        case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset))
+        case LogOffsetResult.Unknown => getDescribePartitionResult(None)
+        case LogOffsetResult.Ignore => null
       }
     }
 
+
+    def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException
+
+    def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException
   }
 
   class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
@@ -278,20 +307,20 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = {
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
       zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match {
-        case Some(-1) => LogEndOffsetResult.Unknown
+        case Some(-1) => LogOffsetResult.Unknown
         case Some(brokerId) =>
           getZkConsumer(brokerId).map { consumer =>
             val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
             val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
             val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
             consumer.close()
-            LogEndOffsetResult.LogEndOffset(logEndOffset)
-          }.getOrElse(LogEndOffsetResult.Ignore)
+            LogOffsetResult.LogOffset(logEndOffset)
+          }.getOrElse(LogOffsetResult.Ignore)
         case None =>
           printError(s"No broker for partition '$topicPartition'")
-          LogEndOffsetResult.Ignore
+          LogOffsetResult.Ignore
       }
     }
 
@@ -380,7 +409,6 @@ object ConsumerGroupCommand extends Logging {
           None
       }
     }
-
   }
 
   class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
@@ -434,12 +462,29 @@ object ConsumerGroupCommand extends Logging {
       )
     }
 
-    protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = {
+    protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = {
+      val consumer = getConsumer()
+      val offsets = consumer.endOffsets(List(topicPartition).asJava)
+      val logStartOffset = offsets.get(topicPartition)
+      LogOffsetResult.LogOffset(logStartOffset)
+    }
+
+    protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = {
+      val consumer = getConsumer()
+      val offsets = consumer.beginningOffsets(List(topicPartition).asJava)
+      val logStartOffset = offsets.get(topicPartition)
+      LogOffsetResult.LogOffset(logStartOffset)
+    }
+
+    protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long): LogOffsetResult = {
       val consumer = getConsumer()
       consumer.assign(List(topicPartition).asJava)
-      consumer.seekToEnd(List(topicPartition).asJava)
-      val logEndOffset = consumer.position(topicPartition)
-      LogEndOffsetResult.LogEndOffset(logEndOffset)
+      val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava)
+      if (offsetsForTimes != null && !offsetsForTimes.isEmpty)
+        LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset)
+      else {
+        getLogEndOffset(topicPartition)
+      }
     }
 
     def close() {
@@ -474,14 +519,160 @@ object ConsumerGroupCommand extends Logging {
       new KafkaConsumer(properties)
     }
 
+    override def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = {
+      val groupId = opts.options.valueOf(opts.groupOpt)
+      val consumerGroupSummary = adminClient.describeConsumerGroup(groupId, opts.options.valueOf(opts.timeoutMsOpt))
+      consumerGroupSummary.state match {
+        case "Empty" =>
+          val partitionsToReset = getPartitionsToReset(groupId)
+          val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
+          val execute = opts.options.has(opts.executeOpt)
+          if (execute)
+            getConsumer().commitSync(preparedOffsets.asJava)
+          preparedOffsets
+        case currentState =>
+          printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
+          Map.empty
+      }
+    }
+
+    private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Iterable[TopicPartition] = topicArgs.flatMap {
+      case topicArg if topicArg.contains(":") =>
+        val topicAndPartitions = topicArg.split(":")
+        val topic = topicAndPartitions(0)
+        topicAndPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt))
+      case topic => getConsumer().partitionsFor(topic).asScala
+        .map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
+    }
+
+    private def getPartitionsToReset(groupId: String): Iterable[TopicPartition] = {
+      if (opts.options.has(opts.allTopicsOpt)) {
+        adminClient.listGroupOffsets(groupId).keys
+      } else if (opts.options.has(opts.topicOpt)) {
+        val topics = opts.options.valuesOf(opts.topicOpt).asScala
+        parseTopicPartitionsToReset(topics)
+      } else {
+        CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
+      }
+    }
+
+    private def parseResetPlan(resetPlanCsv: String): Map[TopicPartition, OffsetAndMetadata] = {
+      resetPlanCsv.split("\n")
+        .map { line =>
+          val Array(topic, partition, offset) = line.split(",").map(_.trim)
+          val topicPartition = new TopicPartition(topic, partition.asInstanceOf[Int])
+          val offsetAndMetadata = new OffsetAndMetadata(offset.asInstanceOf[Long])
+          (topicPartition, offsetAndMetadata)
+        }.toMap
+    }
+
+    private def prepareOffsetsToReset(groupId: String, partitionsToReset: Iterable[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
+      if (opts.options.has(opts.resetToOffsetOpt)) {
+        val offset = opts.options.valueOf(opts.resetToOffsetOpt)
+        partitionsToReset.map {
+          topicPartition => (topicPartition, new OffsetAndMetadata(offset))
+        }.toMap
+      } else if (opts.options.has(opts.resetToEarliestOpt)) {
+        partitionsToReset.map { topicPartition =>
+          getLogStartOffset(topicPartition) match {
+            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case _ => null
+          }
+        }.toMap
+      } else if (opts.options.has(opts.resetToLatestOpt)) {
+        partitionsToReset.map { topicPartition =>
+          getLogEndOffset(topicPartition) match {
+            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case _ => null
+          }
+        }.toMap
+      } else if (opts.options.has(opts.resetShiftByOpt)) {
+        val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
+        partitionsToReset.map { topicPartition =>
+          val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
+          val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
+            throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset"))
+
+          val shiftedOffset = currentOffset + shiftBy
+          val newOffset = getLogEndOffset(topicPartition) match {
+            case LogOffsetResult.LogOffset(endOffset) if shiftedOffset > endOffset =>
+              warn(s"New offset ($shiftedOffset) is higher than latest offset. Value will be set to $endOffset")
+              endOffset
+
+            case _ => getLogStartOffset(topicPartition) match {
+              case LogOffsetResult.LogOffset(startOffset) if shiftedOffset < startOffset =>
+                warn(s"New offset ($shiftedOffset) is lower than earliest offset. Value will be set to $startOffset")
+                startOffset
+
+              case _ => shiftedOffset
+            }
+          }
+          (topicPartition, new OffsetAndMetadata(newOffset))
+        }.toMap
+      } else if (opts.options.has(opts.resetToDatetimeOpt)) {
+        partitionsToReset.map { topicPartition =>
+          val timestamp = getDateTime
+          val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+          logTimestampOffset match {
+            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case _ => null
+          }
+        }.toMap
+      } else if (opts.options.has(opts.resetByDurationOpt)) {
+        partitionsToReset.map { topicPartition =>
+          val duration = opts.options.valueOf(opts.resetByDurationOpt)
+          val now = new Date()
+          val durationParsed = DatatypeFactory.newInstance().newDuration(duration)
+          durationParsed.negate().addTo(now)
+          val timestamp = now.getTime
+          val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp)
+          logTimestampOffset match {
+            case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset))
+            case _ => null
+          }
+        }.toMap
+      } else if (opts.options.has(opts.resetFromFileOpt)) {
+        val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
+        val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
+        val resetPlan = parseResetPlan(resetPlanCsv)
+        partitionsToReset.map { topicPartition =>
+            if (resetPlan.keySet.contains(topicPartition))
+              (topicPartition, resetPlan(topicPartition))
+            else null
+        }.toMap
+      } else {
+        val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
+        partitionsToReset.map { topicPartition =>
+          currentCommittedOffsets.get(topicPartition).map { offset =>
+            (topicPartition, new OffsetAndMetadata(offset))
+          }.orNull
+        }.toMap
+      }
+    }
+
+    private def getDateTime: java.lang.Long = {
+      val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) match {
+        case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
+        case ts => s"${ts}Z"
+      }
+      val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")
+      val date = format.parse(datetime)
+      date.getTime
+    }
+
+    override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = {
+      val rows = assignmentsToReset.map { case (k,v) => s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): List[String]
+      rows.foldRight("")(_ + "\n" + _)
+    }
+
   }
 
-  sealed trait LogEndOffsetResult
+  sealed trait LogOffsetResult
 
-  object LogEndOffsetResult {
-    case class LogEndOffset(value: Long) extends LogEndOffsetResult
-    case object Unknown extends LogEndOffsetResult
-    case object Ignore extends LogEndOffsetResult
+  object LogOffsetResult {
+    case class LogOffset(value: Long) extends LogOffsetResult
+    case object Unknown extends LogOffsetResult
+    case object Ignore extends LogOffsetResult
   }
 
   class ConsumerGroupCommandOptions(args: Array[String]) {
@@ -489,7 +680,10 @@ object ConsumerGroupCommand extends Logging {
       "Multiple URLS can be given to allow fail-over."
     val BootstrapServerDoc = "REQUIRED (unless old consumer is used): The server to connect to."
     val GroupDoc = "The consumer group we wish to act on."
-    val TopicDoc = "The topic whose consumer group information should be deleted."
+    val TopicDoc = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
+      "In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
+      "Reset-offsets also supports multiple topic inputs."
+    val AllTopicsDoc = "Consider all topics assigned to a group in the `reset-offsets` process."
     val ListDoc = "List all consumer groups."
     val DescribeDoc = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."
     val nl = System.getProperty("line.separator")
@@ -505,6 +699,19 @@ object ConsumerGroupCommand extends Logging {
       "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " +
       "or is going through some changes)."
     val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
+    val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl +
+      "Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl +
+      "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file. And by default it resets to current offset." + nl +
+      "To define the scope use: --all-topics or --topic"
+    val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
+    val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets."
+    val ResetToOffsetDoc = "Reset offsets to a specific offset."
+    val ResetFromFileDoc = "Reset offsets to values defined in CSV file."
+    val ResetToDatetimeDoc = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'"
+    val ResetByDurationDoc = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"
+    val ResetToEarliestDoc = "Reset offsets to earliest offset."
+    val ResetToLatestDoc = "Reset offsets to latest offset."
+    val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative"
 
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
@@ -523,6 +730,7 @@ object ConsumerGroupCommand extends Logging {
                          .withRequiredArg
                          .describedAs("topic")
                          .ofType(classOf[String])
+    val allTopicsOpt = parser.accepts("all-topics", AllTopicsDoc)
     val listOpt = parser.accepts("list", ListDoc)
     val describeOpt = parser.accepts("describe", DescribeDoc)
     val deleteOpt = parser.accepts("delete", DeleteDoc)
@@ -536,12 +744,39 @@ object ConsumerGroupCommand extends Logging {
                                   .withRequiredArg
                                   .describedAs("command config property file")
                                   .ofType(classOf[String])
+    val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc)
+    val executeOpt = parser.accepts("execute", ExecuteDoc)
+    val exportOpt = parser.accepts("export", ExportDoc)
+    val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc)
+                           .withRequiredArg()
+                           .describedAs("offset")
+                           .ofType(classOf[Long])
+    val resetFromFileOpt = parser.accepts("from-file", ResetFromFileDoc)
+                                 .withRequiredArg()
+                                 .describedAs("path to CSV file")
+                                 .ofType(classOf[String])
+    val resetToDatetimeOpt = parser.accepts("to-datetime", ResetToDatetimeDoc)
+                                   .withRequiredArg()
+                                   .describedAs("datetime")
+                                   .ofType(classOf[String])
+    val resetByDurationOpt = parser.accepts("by-duration", ResetByDurationDoc)
+                                   .withRequiredArg()
+                                   .describedAs("duration")
+                                   .ofType(classOf[String])
+    val resetToEarliestOpt = parser.accepts("to-earliest", ResetToEarliestDoc)
+    val resetToLatestOpt = parser.accepts("to-latest", ResetToLatestDoc)
+    val resetShiftByOpt = parser.accepts("shift-by", ResetShiftByDoc)
+                             .withRequiredArg()
+                             .describedAs("number-of-offsets")
+                             .ofType(classOf[Long])
     val options = parser.parse(args : _*)
 
     val useOldConsumer = options.has(zkConnectOpt)
     val describeOptPresent = options.has(describeOpt)
 
-    val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
+    val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
+    val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
+      resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetFromFileOpt)
 
     def checkArgs() {
       // check required args
@@ -566,10 +801,19 @@ object ConsumerGroupCommand extends Logging {
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
       if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
         CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
+      if (options.has(resetOffsetsOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
 
       // check invalid args
-      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
-      CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2181ae76/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
new file mode 100644
index 0000000..d58231e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -0,0 +1,601 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package unit.kafka.admin
+
+import java.io.{BufferedWriter, File, FileWriter}
+import java.text.SimpleDateFormat
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.{Calendar, Collections, Date, Properties}
+
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
+import kafka.admin.{AdminUtils, ConsumerGroupCommand}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.{Before, Test}
+
+/**
+  * Test cases by:
+  * - Non-existing consumer group
+  * - One for each scenario, with scope=all-topics
+  * - scope=one topic, scenario=to-earliest
+  * - scope=one topic+partitions, scenario=to-earliest
+  * - scope=topics, scenario=to-earliest
+  * - scope=topics+partitions, scenario=to-earliest
+  * - export/import
+  */
+class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
+
+  val overridingProps = new Properties()
+  val topic1 = "foo1"
+  val topic2 = "foo2"
+  val group = "test.group"
+  val props = new Properties
+
+  /**
+    * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
+    * test and should not reuse previous configurations unless they select their ports randomly when servers are started.
+    */
+  override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    props.setProperty("group.id", group)
+  }
+
+  @Test
+  def testResetOffsetsNotExistingGroup() {
+    new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset == Map.empty
+    }, "Expected to have an empty assignations map.")
+
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToLocalDateTime() {
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
+    val checkpoint = new Date()
+    val calendar = Calendar.getInstance()
+    calendar.add(Calendar.DATE, -1)
+
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+
+    TestUtils.waitUntilTrue(() => {
+      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      assignmentsOption match {
+        case Some(assignments) =>
+          val sumOffset = assignments.filter(_.topic.exists(_ == topic1))
+            .filter(_.offset.isDefined)
+            .map(assignment => assignment.offset.get)
+            .foldLeft(0.toLong)(_ + _)
+          sumOffset == 100
+        case _ => false
+      }
+    }, "Expected that consumer group has consumed all messages from topic/partition.")
+
+    executor.shutdown()
+
+    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute")
+    val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
+    val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand1.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+    }, "Expected the consumer group to reset to when offset was 50.")
+
+    printConsumerGroup()
+
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToZonedDateTime() {
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
+
+    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")
+    val checkpoint = new Date()
+
+    TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+
+    TestUtils.waitUntilTrue(() => {
+      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      assignmentsOption match {
+        case Some(assignments) =>
+          val sumOffset = (assignments.filter(_.topic.exists(_ == topic1))
+            .filter(_.offset.isDefined)
+            .map(assignment => assignment.offset.get) foldLeft 0.toLong)(_ + _)
+          sumOffset == 100
+        case _ => false
+      }
+    }, "Expected that consumer group has consumed all messages from topic/partition.")
+
+    executor.shutdown()
+
+    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
+    val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
+    val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand1.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 50 }
+    }, "Expected the consumer group to reset to when offset was 50.")
+
+    printConsumerGroup()
+
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsByDuration() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.waitUntilTrue(() => {
+        val assignmentsToReset = consumerGroupCommand.resetOffsets()
+        assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+    }, "Expected the consumer group to reset to offset 0 (earliest by duration).")
+
+    printConsumerGroup()
+
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsByDurationToEarliest() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 100 }
+    }, "Expected the consumer group to reset to offset 100 (latest by duration).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToEarliest() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+    }, "Expected the consumer group to reset to offset 0 (earliest).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToLatest() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 })
+    }, "Expected the consumer group to reset to offset 200 (latest).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToCurrentOffset() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 100 })
+    }, "Expected the consumer group to reset to offset 100 (current).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int = 1, topic: String, totalMessages: Int) {
+    TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
+    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic)
+
+
+    TestUtils.waitUntilTrue(() => {
+      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      assignmentsOption match {
+        case Some(assignments) =>
+          val sumOffset = assignments.filter(_.topic.exists(_ == topic))
+            .filter(_.offset.isDefined)
+            .map(assignment => assignment.offset.get)
+            .foldLeft(0.toLong)(_ + _)
+          sumOffset == totalMessages
+        case _ => false
+      }
+    }, "Expected the consumer group to consume all messages from topic.")
+
+    executor.shutdown()
+  }
+
+  @Test
+  def testResetOffsetsToSpecificOffset() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 1 })
+    }, "Expected the consumer group to reset to offset 1 (specific offset).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsShiftPlus() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 150 })
+    }, "Expected the consumer group to reset to offset 150 (current + 50).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsShiftMinus() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 })
+    }, "Expected the consumer group to reset to offset 50 (current - 50).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsShiftByLowerThanEarliest() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 0 })
+    }, "Expected the consumer group to reset to offset 0 (earliest by shift).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsShiftByHigherThanLatest() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 })
+    }, "Expected the consumer group to reset to offset 200 (latest by shift).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToEarliestOnOneTopic() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+    }, "Expected the consumer group to reset to offset 0 (earliest).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToEarliestOnOneTopicAndPartition() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 }
+    }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToEarliestOnTopics() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets",
+      "--group", group,
+      "--topic", topic1,
+      "--topic", topic2,
+      "--to-earliest", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    AdminUtils.createTopic(zkUtils, topic2, 1, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
+    produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.topic() == topic1 } &&
+        assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.topic() == topic2 }
+    }, "Expected the consumer group to reset to offset 0 (earliest).")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    AdminUtils.deleteTopic(zkUtils, topic2)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsToEarliestOnTopicsAndPartitions() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets",
+      "--group", group,
+      "--topic", String.format("%s:1", topic1),
+      "--topic", String.format("%s:1", topic2),
+      "--to-earliest", "--execute")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+    produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100)
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 && assignment._1.topic() == topic1 }
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && assignment._1.partition() == 1 && assignment._1.topic() == topic2 }
+    }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    AdminUtils.deleteTopic(zkUtils, topic2)
+    consumerGroupCommand.close()
+  }
+
+  @Test
+  def testResetOffsetsExportImportPlan() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--export")
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+
+    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+
+    produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+
+    val file = File.createTempFile("reset", ".csv")
+
+    TestUtils.waitUntilTrue(() => {
+      val assignmentsToReset = consumerGroupCommand.resetOffsets()
+      val bw = new BufferedWriter(new FileWriter(file))
+      bw.write(consumerGroupCommand.exportOffsetsToReset(assignmentsToReset))
+      bw.close()
+      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 } && file.exists()
+    }, "Expected the consume all messages and save reset offsets plan to file")
+
+
+    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--from-file", file.getCanonicalPath)
+    val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
+    val consumerGroupCommandExec = new KafkaConsumerGroupService(optsExec)
+
+
+    TestUtils.waitUntilTrue(() => {
+        val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
+        assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
+    }, "Expected the consumer group to reset to offset 0 (earliest) by file.")
+
+    file.deleteOnExit()
+
+    printConsumerGroup()
+    AdminUtils.deleteTopic(zkUtils, topic1)
+    consumerGroupCommand.close()
+  }
+
+  private def printConsumerGroup() {
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, "--describe")
+    ConsumerGroupCommand.main(cgcArgs)
+  }
+
+}
+
+
+class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable {
+  val props = new Properties
+  props.put("bootstrap.servers", broker)
+  props.put("group.id", groupId)
+  props.put("key.deserializer", classOf[StringDeserializer].getName)
+  props.put("value.deserializer", classOf[StringDeserializer].getName)
+  val consumer = new KafkaConsumer(props)
+
+  def run() {
+    try {
+      consumer.subscribe(Collections.singleton(topic))
+      while (true)
+        consumer.poll(Long.MaxValue)
+    } catch {
+      case _: WakeupException => // OK
+    } finally {
+      consumer.close()
+    }
+  }
+
+  def shutdown() {
+    consumer.wakeup()
+  }
+}
+
+class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
+  val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
+  var consumers: List[ConsumerThread] = List[ConsumerThread]()
+
+  for (i <- 1 to numConsumers) {
+    val consumer = new ConsumerThread(broker, i, groupId, topic)
+    consumers ++= List(consumer)
+    executor.submit(consumer)
+  }
+
+  Runtime.getRuntime.addShutdownHook(new Thread() {
+    override def run() {
+      shutdown()
+    }
+  })
+
+  def shutdown() {
+    consumers.foreach(_.shutdown())
+    executor.shutdown()
+    try {
+      executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
+    } catch {
+      case e: InterruptedException =>
+        e.printStackTrace()
+    }
+  }
+}