You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/02 11:43:56 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10954: WIP KIP-709: Implement batching for fetchOffsets

rajinisivaram commented on a change in pull request #10954:
URL: https://github.com/apache/kafka/pull/10954#discussion_r662935021



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
##########
@@ -17,37 +17,51 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.List;
 
 /**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(List)}.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {
 
-    private List<TopicPartition> topicPartitions = null;
+    private Map<String, List<TopicPartition>> groupToTopicPartitions = new HashMap<>();
 
     /**
-     * Set the topic partitions to list as part of the result.
-     * {@code null} includes all topic partitions.
-     *
-     * @param topicPartitions List of topic partitions to include
+     * Default constructor for {@code ListConsumerGroupOffsetsOptions}. Sets the topic partitions
+     * to fetch for each group id to {@code null}, which indicates to fetch all offsets for all
+     * topic partitions for that group.
+     * */
+    public ListConsumerGroupOffsetsOptions(List<String> groupIds) {

Review comment:
       This is a breaking change in a public API since it removes the default constructor. In any case, don't really want this in the constructor, we should add methods for whatever we need. Actually looking at the rest of the changes in this class, we are repurposing an existing public API by changing all of its methods, we need to completely rethink this change.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
##########
@@ -17,37 +17,51 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.List;
 
 /**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(List)}.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {
 
-    private List<TopicPartition> topicPartitions = null;
+    private Map<String, List<TopicPartition>> groupToTopicPartitions = new HashMap<>();
 
     /**
-     * Set the topic partitions to list as part of the result.
-     * {@code null} includes all topic partitions.
-     *
-     * @param topicPartitions List of topic partitions to include
+     * Default constructor for {@code ListConsumerGroupOffsetsOptions}. Sets the topic partitions
+     * to fetch for each group id to {@code null}, which indicates to fetch all offsets for all
+     * topic partitions for that group.
+     * */
+    public ListConsumerGroupOffsetsOptions(List<String> groupIds) {
+        for (String group : groupIds) {
+            groupToTopicPartitions.put(group, null);
+        }
+    }
+
+    /**
+     * Set the topic partitions for each group we want to fetch offsets for as part of the result.
+     * {@code null} mapping for a specific group id means to fetch offsets for all topic
+     * partitions for that specific group.
+     * @param groupToTopicPartitions Map of group id to list of topic partitions to fetch offsets
+     *                              for.
      * @return This ListGroupOffsetsOptions
      */
-    public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) {
-        this.topicPartitions = topicPartitions;
+    public ListConsumerGroupOffsetsOptions groupToTopicPartitions(Map<String, List<TopicPartition>> groupToTopicPartitions) {

Review comment:
       We are using options in an inconsistent way here compared to other APIs. A good example to follow would be:
   ```
   public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
                                                             ListOffsetsOptions options)
   ```
   Options here are additional options that apply to the request. Data for the request comes from the first argument. We could do something similar for listConsumerGroupOffsets.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -109,6 +113,7 @@
     private AtomicBoolean asyncCommitFenced;
     private ConsumerGroupMetadata groupMetadata;
     private final boolean throwOnFetchStableOffsetsUnsupported;
+    private volatile boolean batchFetchOffsets = true;

Review comment:
       I can see this is based on the pattern used in the recent changes to FindCoordinator. But I don't see the point of it. If one broker that is the coordinator at any point in time wasn't upgraded, we switch to non-batched mode and stay there forever until the consumer is restarted. Given that consumers don't need batching, it seems to me that we could avoid any changes in consumers and just update `OffsetFetchRequest` to do the right thing based on versions. @dajac Since you reviewed the PR for FindCoordinators, what do you think?

##########
File path: clients/src/main/resources/common/message/OffsetFetchResponse.json
##########
@@ -30,30 +30,57 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 adds pending offset commit as new error response on partition level.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", 
+    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7",
       "about": "The responses per topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+",
+      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7",
         "about": "The responses per partition", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+        { "name": "PartitionIndex", "type": "int32", "versions": "0-7",
           "about": "The partition index." },
-        { "name": "CommittedOffset", "type": "int64", "versions": "0+",
+        { "name": "CommittedOffset", "type": "int64", "versions": "0-7",
           "about": "The committed message offset." },
-        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1",
+        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1",
           "ignorable": true, "about": "The leader epoch." },
-        { "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
+        { "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7",
           "about": "The partition metadata." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        { "name": "ErrorCode", "type": "int16", "versions": "0-7",
           "about": "The error code, or 0 if there was no error." }
       ]}
     ]},
-    { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable": true,
-      "about": "The top-level error code, or 0 if there was no error." }
+    { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true,
+      "about": "The top-level error code, or 0 if there was no error." },
+    {"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
+      "about": "The responses per group id.", "fields": [
+      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
+        "about": "The group ID." },
+      { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
+        "about": "The responses per topic.", "fields": [
+        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",

Review comment:
       Can't be 0+ for a field added in 8+?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -888,22 +888,26 @@ default ListConsumerGroupsResult listConsumerGroups() {
     }
 
     /**
-     * List the consumer group offsets available in the cluster.
-     *
+     * List the consumer group offsets available in the cluster for the given list of consumer
+     * groups.
+     * @param groupIds List of consumer group ids to list offsets for.
      * @param options The options to use when listing the consumer group offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> groupIds, ListConsumerGroupOffsetsOptions options);
 
     /**
      * List the consumer group offsets available in the cluster with the default options.
      * <p>
-     * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+     * This is a convenience method for
+     * {@link #listConsumerGroupOffsets(List, ListConsumerGroupOffsetsOptions)} with
+     * default options.
      *
+     * @param groupIds List of consumer group ids to list offsets for.
      * @return The ListGroupOffsetsResult.
      */
-    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
-        return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> groupIds) {

Review comment:
       We need to keep the old method as before and deprecate.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
##########
@@ -17,33 +17,62 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Map;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(List)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<CoordinatorKey, KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>>> futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFutureImpl<Map<TopicPartition,
+        OffsetAndMetadata>>> futures) {
+        this.futures = futures;
     }
 
     /**
-     * Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
-     * If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
+     * Return a future which yields a map of group ids to a map of topic partitions to
+     * OffsetAndMetadata objects. If the group doesn't have a committed offset for a specific
+     * partition, the corresponding value in the returned map for that group id will be null.
      */
-    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {

Review comment:
       We need to keep this public method and deprecate. Perhaps throw an exception if multiple group ids were specified and retain existing behaviour for single group id.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -888,22 +888,26 @@ default ListConsumerGroupsResult listConsumerGroups() {
     }
 
     /**
-     * List the consumer group offsets available in the cluster.
-     *
+     * List the consumer group offsets available in the cluster for the given list of consumer
+     * groups.
+     * @param groupIds List of consumer group ids to list offsets for.
      * @param options The options to use when listing the consumer group offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String> groupIds, ListConsumerGroupOffsetsOptions options);

Review comment:
       +1

##########
File path: clients/src/main/resources/common/message/OffsetFetchRequest.json
##########
@@ -31,19 +31,33 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is adding the require stable flag.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups at a time
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
+    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
       "about": "The group to fetch offsets for." },
-    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+",
+    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7",
       "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
         "about": "The topic name."},
-      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
         "about": "The partition indexes we would like to fetch offsets for." }
     ]},
+    { "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
+      "about": "Each group we would like to fetch offsets for", "fields": [
+      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
+        "about": "The group ID."},
+      { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
+        "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
+        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",

Review comment:
       Can't be 0+ for a field added in 8+?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org