You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/14 17:38:02 UTC

[kafka] branch trunk updated: MINOR: Use KafkaConsumer in GetOffsetShell (#5220)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a423b1d  MINOR: Use KafkaConsumer in GetOffsetShell (#5220)
a423b1d is described below

commit a423b1d07d7e0ec70ff8b380f944368e6ebfb53b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Jun 14 10:37:35 2018 -0700

    MINOR: Use KafkaConsumer in GetOffsetShell (#5220)
    
    This does the minimal amount of work so that the tool
    relies on public non-deprecated APIs (i.e. it no longer
    relies on Scala clients code).
    
    Additional improvements (not included here) have
    been proposed via KIP-308.
    
    There are a few other PRs that touch this class with
    overlapping goals:
    
    - https://github.com/apache/kafka/pull/2891
    - https://github.com/apache/kafka/pull/3051
    - https://github.com/apache/kafka/pull/3320
    
    One of them remains relevant in the context of KIP-308, but
    the others have been superseded. I included the authors of
    the 3 PRs as co-authors.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>, Vahid Hashemian <va...@us.ibm.com>, Manikumar Reddy <ma...@gmail.com>
    
    Co-authored-by: Arseniy Tashoyan <ta...@gmail.com>
    Co-authored-by: Vahid Hashemian <va...@us.ibm.com>
    Co-authored-by: Mohammed Amine GARMES
    Co-authored-by: Ismael Juma <is...@juma.me.uk>
---
 .../main/scala/kafka/tools/GetOffsetShell.scala    | 127 ++++++++++++++-------
 1 file changed, 85 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 4104ded..eafddc6 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -18,13 +18,16 @@
  */
 package kafka.tools
 
-import kafka.consumer._
+import java.util.Properties
+
 import joptsimple._
-import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
-import kafka.common.TopicAndPartition
-import kafka.client.ClientUtils
 import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 
+import scala.collection.JavaConverters._
 
 object GetOffsetShell {
 
@@ -47,20 +50,20 @@ object GetOffsetShell {
                            .withRequiredArg
                            .describedAs("timestamp/-1(latest)/-2(earliest)")
                            .ofType(classOf[java.lang.Long])
-                           .defaultsTo(-1)
-    val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
+                           .defaultsTo(-1L)
+    parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned")
                            .withRequiredArg
                            .describedAs("count")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1)
-    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+    parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.")
                            .withRequiredArg
                            .describedAs("ms")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1000)
-                           
-   if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.")
+
+   if (args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.")
 
     val options = parser.parse(args : _*)
 
@@ -69,41 +72,81 @@ object GetOffsetShell {
     val clientId = "GetOffsetShell"
     val brokerList = options.valueOf(brokerListOpt)
     ToolsUtils.validatePortOrDie(parser, brokerList)
-    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
     val topic = options.valueOf(topicOpt)
-    val partitionList = options.valueOf(partitionOpt)
-    val time = options.valueOf(timeOpt).longValue
-    val nOffsets = options.valueOf(nOffsetsOpt).intValue
-    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
-
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
-    if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
-      System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
-        "kafka-list-topic.sh to verify")
-      Exit.exit(1)
-    }
-    val partitions =
-      if(partitionList == "") {
-        topicsMetadata.head.partitionsMetadata.map(_.partitionId)
-      } else {
-        partitionList.split(",").map(_.toInt).toSeq
-      }
-    partitions.foreach { partitionId =>
-      val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
-      partitionMetadataOpt match {
-        case Some(metadata) =>
-          metadata.leader match {
-            case Some(leader) =>
-              val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
-              val topicAndPartition = TopicAndPartition(topic, partitionId)
-              val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
-              val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
-
-              println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
-            case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))
+    val partitionIdsRequested: Set[Int] = {
+      val partitionsString = options.valueOf(partitionOpt)
+      if (partitionsString.isEmpty)
+        Set.empty
+      else
+        partitionsString.split(",").map { partitionString =>
+          try partitionString.toInt
+          catch {
+            case _: NumberFormatException =>
+              System.err.println(s"--partitions expects a comma separated list of numeric partition ids, but received: $partitionsString")
+              Exit.exit(1)
           }
-        case None => System.err.println("Error: partition %d does not exist".format(partitionId))
+        }.toSet
+    }
+    val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+
+    val config = new Properties
+    config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+    val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+
+    val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match {
+      case None =>
+        System.err.println(s"Topic $topic does not exist")
+        Exit.exit(1)
+      case Some(p) if p.isEmpty =>
+        if (partitionIdsRequested.isEmpty)
+          System.err.println(s"Topic $topic has 0 partitions")
+        else
+          System.err.println(s"Topic $topic does not have any of the requested partitions ${partitionIdsRequested.mkString(",")}")
+        Exit.exit(1)
+      case Some(p) => p
+    }
+
+    if (partitionIdsRequested.nonEmpty) {
+      (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { partitionId =>
+        System.err.println(s"Error: partition $partitionId does not exist")
       }
     }
+
+    val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p =>
+      if (p.leader == null) {
+        System.err.println(s"Error: partition ${p.partition} does not have a leader. Skip getting offsets")
+        None
+      } else
+        Some(new TopicPartition(p.topic, p.partition))
+    }
+
+    /* Note that the value of the map can be null */
+    val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match {
+      case ListOffsetRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala
+      case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala
+      case _ =>
+        val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava
+        consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset)
+    }
+
+    partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>
+      println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
+    }
+
+  }
+
+  /**
+   * Return the partition infos for `topic`. If the topic does not exist, `None` is returned.
+   */
+  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
+    val partitionInfos = consumer.listTopics.asScala.filterKeys(_ == topic).values.flatMap(_.asScala).toBuffer
+    if (partitionInfos.isEmpty)
+      None
+    else if (partitionIds.isEmpty)
+      Some(partitionInfos)
+    else
+      Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
   }
+
 }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.