You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/03 15:39:48 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11173: MINOR: Support max timestamp in GetOffsetShell

dajac commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r762038197



##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -222,11 +240,19 @@ object GetOffsetShell {
   }
 
   /**
-   * Return the partition infos. Filter them with topicPartitionFilter.
+   * Return the partition infos. Filter them with topicFilter and topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(client: Admin,
+                                 topicFilter: String => Boolean,
+                                 topicPartitionFilter: PartitionInfo => Boolean,
+                                 excludeInternalTopics: Boolean): Seq[PartitionInfo] = {
+    val topics = client.listTopics(new ListTopicsOptions().listInternal(!excludeInternalTopics)).names().get().asScala.filter(topicFilter)

Review comment:
       nit: This line is quite hard to read. Could we extract `new ListTopicsOptions().listInternal(!excludeInternalTopics)` in a variable? You can also remove the parenthesis of all the getters (e.g. names, get).

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -195,16 +205,24 @@ object GetOffsetShell {
         val upperRange = group(4).map(_.toInt).getOrElse(Int.MaxValue)
         (p: Int) => p >= lowerRange && p < upperRange
     }
-
-    tp => topicFilter.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter(tp.partition)
+    (
+      topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics),
+      tp => topicFilter.isTopicAllowed(tp.topic, excludeInternalTopics) && partitionFilter(tp.partition)
+    )

Review comment:
       The code is getting a bit complicated with all those filter functions. I wonder if we should introduce an abstraction for them now. Something along the lines of the following. The name are not very good but the idea is there. What do you think?
   
   ```scala
   trait TopicPartitionFilter {
     def isTopicAllowed(topic: String): Boolean
     def isPartitionAllowed(partition: PartitionInfo): Boolean
   }
   
   case class UniqueTopicPartitionFilter(
     topicRegex: String,
     partition: Int
   ) extends TopicPartitionFilter {
    ...
   }
   
   case class RangeTopicPartitionFilter(
     topicRegex: String,
     lower: Int,
     upper: Int
   ) extends TopicPartitionFilter {
     ...
   }
   
   case class SetTopicPartitionFilter(
     topic: String,
     partitionIds: Set[Int]
   ) extends TopicPartitionFilter {
     ...
   }
   
   case class CompositePartitionFilter(
     filters: List[TopicPartitionFilter]
   ) extends TopicPartitionFilter {
     ...
   }
   ```

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -164,20 +167,27 @@ object GetOffsetShell {
   }
 
   /**
-   * Creates a topic-partition filter based on a list of patterns.
+   * Creates a topic filter and a topic-partition filter based on a list of patterns.
    * Expected format:
    * List: TopicPartitionPattern(, TopicPartitionPattern)*
    * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern
    * TopicPattern: REGEX
    * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
    */
-  def createTopicPartitionFilterWithPatternList(topicPartitions: String, excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+  def createTopicPartitionFilterWithPatternList(topicPartitions: String,
+                                                excludeInternalTopics: Boolean
+                                               ): (String => Boolean, PartitionInfo => Boolean) = {

Review comment:
       nit: Could we format methods as follow? We tend to follow this pattern nowadays.
   
   ```scala
   def createTopicPartitionFilterWithPatternList(
     topicPartitions: String,
     excludeInternalTopics: Boolean
   ): (String => Boolean, PartitionInfo => Boolean) = {
   
   }
   ```

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -222,11 +240,19 @@ object GetOffsetShell {
   }
 
   /**
-   * Return the partition infos. Filter them with topicPartitionFilter.
+   * Return the partition infos. Filter them with topicFilter and topicPartitionFilter.
    */
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-    consumer.listTopics.asScala.values.flatMap { partitions =>
-      partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(client: Admin,
+                                 topicFilter: String => Boolean,
+                                 topicPartitionFilter: PartitionInfo => Boolean,
+                                 excludeInternalTopics: Boolean): Seq[PartitionInfo] = {
+    val topics = client.listTopics(new ListTopicsOptions().listInternal(!excludeInternalTopics)).names().get().asScala.filter(topicFilter)
+    client.describeTopics(topics.asJavaCollection).allTopicNames().get().asScala.flatMap { case (topic, description) =>
+      description
+        .partitions()
+        .asScala
+        .map(tp => new PartitionInfo(topic, tp.partition(), tp.leader(), tp.replicas().asScala.toArray, tp.isr().asScala.toArray))

Review comment:
       Instead of converting to `PartitionInfo`, could we adapt the filters to work with what we have here?

##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -123,10 +123,10 @@ object GetOffsetShell {
       new Properties
     config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

Review comment:
       Should we use `AdminConfig`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org