You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/14 17:38:02 UTC
[kafka] branch trunk updated: MINOR: Use KafkaConsumer in
GetOffsetShell (#5220)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a423b1d MINOR: Use KafkaConsumer in GetOffsetShell (#5220)
a423b1d is described below
commit a423b1d07d7e0ec70ff8b380f944368e6ebfb53b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Jun 14 10:37:35 2018 -0700
MINOR: Use KafkaConsumer in GetOffsetShell (#5220)
This does the minimal amount of work so that the tool
relies on public non-deprecated APIs (i.e. it no longer
relies on Scala clients code).
Additional improvements (not included here) have
been proposed via KIP-308.
There are a few other PRs that touch this class with
overlapping goals:
- https://github.com/apache/kafka/pull/2891
- https://github.com/apache/kafka/pull/3051
- https://github.com/apache/kafka/pull/3320
One of them remains relevant in the context of KIP-308, but
the others have been superseded. I included the authors of
the 3 PRs as co-authors.
Reviewers: Rajini Sivaram <ra...@googlemail.com>, Vahid Hashemian <va...@us.ibm.com>, Manikumar Reddy <ma...@gmail.com>
Co-authored-by: Arseniy Tashoyan <ta...@gmail.com>
Co-authored-by: Vahid Hashemian <va...@us.ibm.com>
Co-authored-by: Mohammed Amine GARMES
Co-authored-by: Ismael Juma <is...@juma.me.uk>
---
.../main/scala/kafka/tools/GetOffsetShell.scala | 127 ++++++++++++++-------
1 file changed, 85 insertions(+), 42 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 4104ded..eafddc6 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -18,13 +18,16 @@
*/
package kafka.tools
-import kafka.consumer._
+import java.util.Properties
+
import joptsimple._
-import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
-import kafka.common.TopicAndPartition
-import kafka.client.ClientUtils
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import scala.collection.JavaConverters._
object GetOffsetShell {
@@ -47,20 +50,20 @@ object GetOffsetShell {
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
- .defaultsTo(-1)
- val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
+ .defaultsTo(-1L)
+ parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
- val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+ parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
-
- if(args.length == 0)
- CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.")
+
+ if (args.length == 0)
+ CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.")
val options = parser.parse(args : _*)
@@ -69,41 +72,81 @@ object GetOffsetShell {
val clientId = "GetOffsetShell"
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser, brokerList)
- val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topic = options.valueOf(topicOpt)
- val partitionList = options.valueOf(partitionOpt)
- val time = options.valueOf(timeOpt).longValue
- val nOffsets = options.valueOf(nOffsetsOpt).intValue
- val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
-
- val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
- if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
- System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
- "kafka-list-topic.sh to verify")
- Exit.exit(1)
- }
- val partitions =
- if(partitionList == "") {
- topicsMetadata.head.partitionsMetadata.map(_.partitionId)
- } else {
- partitionList.split(",").map(_.toInt).toSeq
- }
- partitions.foreach { partitionId =>
- val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
- partitionMetadataOpt match {
- case Some(metadata) =>
- metadata.leader match {
- case Some(leader) =>
- val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
- val topicAndPartition = TopicAndPartition(topic, partitionId)
- val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
- val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
-
- println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
- case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))
+ val partitionIdsRequested: Set[Int] = {
+ val partitionsString = options.valueOf(partitionOpt)
+ if (partitionsString.isEmpty)
+ Set.empty
+ else
+ partitionsString.split(",").map { partitionString =>
+ try partitionString.toInt
+ catch {
+ case _: NumberFormatException =>
+ System.err.println(s"--partitions expects a comma separated list of numeric partition ids, but received: $partitionsString")
+ Exit.exit(1)
}
- case None => System.err.println("Error: partition %d does not exist".format(partitionId))
+ }.toSet
+ }
+ val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+
+ val config = new Properties
+ config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+ val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+
+ val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match {
+ case None =>
+ System.err.println(s"Topic $topic does not exist")
+ Exit.exit(1)
+ case Some(p) if p.isEmpty =>
+ if (partitionIdsRequested.isEmpty)
+ System.err.println(s"Topic $topic has 0 partitions")
+ else
+ System.err.println(s"Topic $topic does not have any of the requested partitions ${partitionIdsRequested.mkString(",")}")
+ Exit.exit(1)
+ case Some(p) => p
+ }
+
+ if (partitionIdsRequested.nonEmpty) {
+ (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { partitionId =>
+ System.err.println(s"Error: partition $partitionId does not exist")
}
}
+
+ val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p =>
+ if (p.leader == null) {
+ System.err.println(s"Error: partition ${p.partition} does not have a leader. Skip getting offsets")
+ None
+ } else
+ Some(new TopicPartition(p.topic, p.partition))
+ }
+
+ /* Note that the value of the map can be null */
+ val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match {
+ case ListOffsetRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala
+ case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala
+ case _ =>
+ val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava
+ consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset)
+ }
+
+ partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>
+ println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+ }
+
+ }
+
+ /**
+ * Return the partition infos for `topic`. If the topic does not exist, `None` is returned.
+ */
+ private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
+ val partitionInfos = consumer.listTopics.asScala.filterKeys(_ == topic).values.flatMap(_.asScala).toBuffer
+ if (partitionInfos.isEmpty)
+ None
+ else if (partitionIds.isEmpty)
+ Some(partitionInfos)
+ else
+ Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
}
+
}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.