You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/09/08 19:21:11 UTC

[kafka] branch 3.0 updated: KAFKA-13276: Prefer KafkaFuture in admin Result constructors (#11301)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new c24b804  KAFKA-13276: Prefer KafkaFuture in admin Result constructors (#11301)
c24b804 is described below

commit c24b804fcb9552ec9f3992bfaa52c11657656018
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Wed Sep 8 20:15:44 2021 +0100

    KAFKA-13276: Prefer KafkaFuture in admin Result constructors (#11301)
    
    Avoid using the non-public API KafkaFutureImpl in the Admin client's `*Result` class constructors.
    This is particularly problematic for `DescribeConsumerGroupsResult` which currently has a
    public constructor.  For the other classes the rationale is simply consistency with the majority of
    the `*Result` classes.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk, David Jacot <dj...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 .../org/apache/kafka/clients/admin/AbortTransactionResult.java |  5 ++---
 .../apache/kafka/clients/admin/DeleteConsumerGroupsResult.java |  8 +++-----
 .../kafka/clients/admin/DescribeConsumerGroupsResult.java      | 10 ++++------
 .../apache/kafka/clients/admin/DescribeProducersResult.java    |  7 +++----
 .../apache/kafka/clients/admin/DescribeTransactionsResult.java |  7 +++----
 .../org/apache/kafka/clients/admin/ElectLeadersResult.java     |  4 ++--
 .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java  |  6 ++++--
 .../apache/kafka/clients/admin/ListConsumerGroupsResult.java   |  2 +-
 .../org/apache/kafka/clients/admin/ListTransactionsResult.java |  4 ++--
 .../apache/kafka/clients/admin/internals/AdminApiFuture.java   | 10 ++++++----
 .../kafka/clients/admin/internals/AdminApiDriverTest.java      |  8 ++++----
 .../test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala |  5 ++---
 12 files changed, 36 insertions(+), 40 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
index 451595b..30cbcee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 import java.util.Map;
 
@@ -30,9 +29,9 @@ import java.util.Map;
  */
 @InterfaceStability.Evolving
 public class AbortTransactionResult {
-    private final Map<TopicPartition, KafkaFutureImpl<Void>> futures;
+    private final Map<TopicPartition, KafkaFuture<Void>> futures;
 
-    AbortTransactionResult(Map<TopicPartition, KafkaFutureImpl<Void>> futures) {
+    AbortTransactionResult(Map<TopicPartition, KafkaFuture<Void>> futures) {
         this.futures = futures;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index bf4c45c..90ddbd0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -32,9 +30,9 @@ import java.util.Map;
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-    private final Map<CoordinatorKey, KafkaFutureImpl<Void>> futures;
+    private final Map<String, KafkaFuture<Void>> futures;
 
-    DeleteConsumerGroupsResult(final Map<CoordinatorKey, KafkaFutureImpl<Void>> futures) {
+    DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
         this.futures = futures;
     }
 
@@ -44,7 +42,7 @@ public class DeleteConsumerGroupsResult {
      */
     public Map<String, KafkaFuture<Void>> deletedGroups() {
         Map<String, KafkaFuture<Void>> deletedGroups = new HashMap<>(futures.size());
-        futures.forEach((key, future) -> deletedGroups.put(key.idValue, future));
+        futures.forEach((key, future) -> deletedGroups.put(key, future));
         return deletedGroups;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index a6881c4..8940060 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -35,9 +33,9 @@ import java.util.concurrent.ExecutionException;
 @InterfaceStability.Evolving
 public class DescribeConsumerGroupsResult {
 
-    private final Map<CoordinatorKey, KafkaFutureImpl<ConsumerGroupDescription>> futures;
+    private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
 
-    public DescribeConsumerGroupsResult(final Map<CoordinatorKey, KafkaFutureImpl<ConsumerGroupDescription>> futures) {
+    public DescribeConsumerGroupsResult(final Map<String, KafkaFuture<ConsumerGroupDescription>> futures) {
         this.futures = futures;
     }
 
@@ -46,7 +44,7 @@ public class DescribeConsumerGroupsResult {
      */
     public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups() {
         Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = new HashMap<>();
-        futures.forEach((key, future) -> describedGroups.put(key.idValue, future));
+        futures.forEach((key, future) -> describedGroups.put(key, future));
         return describedGroups;
     }
 
@@ -59,7 +57,7 @@ public class DescribeConsumerGroupsResult {
                 Map<String, ConsumerGroupDescription> descriptions = new HashMap<>(futures.size());
                 futures.forEach((key, future) -> {
                     try {
-                        descriptions.put(key.idValue, future.get());
+                        descriptions.put(key, future.get());
                     } catch (InterruptedException | ExecutionException e) {
                         // This should be unreachable, since the KafkaFuture#allOf already ensured
                         // that all of the futures completed successfully.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java
index 55897bf..13977c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 import java.util.HashMap;
 import java.util.List;
@@ -30,9 +29,9 @@ import java.util.concurrent.ExecutionException;
 @InterfaceStability.Evolving
 public class DescribeProducersResult {
 
-    private final Map<TopicPartition, KafkaFutureImpl<PartitionProducerState>> futures;
+    private final Map<TopicPartition, KafkaFuture<PartitionProducerState>> futures;
 
-    DescribeProducersResult(Map<TopicPartition, KafkaFutureImpl<PartitionProducerState>> futures) {
+    DescribeProducersResult(Map<TopicPartition, KafkaFuture<PartitionProducerState>> futures) {
         this.futures = futures;
     }
 
@@ -49,7 +48,7 @@ public class DescribeProducersResult {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
             .thenApply(nil -> {
                 Map<TopicPartition, PartitionProducerState> results = new HashMap<>(futures.size());
-                for (Map.Entry<TopicPartition, KafkaFutureImpl<PartitionProducerState>> entry : futures.entrySet()) {
+                for (Map.Entry<TopicPartition, KafkaFuture<PartitionProducerState>> entry : futures.entrySet()) {
                     try {
                         results.put(entry.getKey(), entry.getValue().get());
                     } catch (InterruptedException | ExecutionException e) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
index edacc9f..278a254 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -28,9 +27,9 @@ import java.util.concurrent.ExecutionException;
 
 @InterfaceStability.Evolving
 public class DescribeTransactionsResult {
-    private final Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures;
+    private final Map<CoordinatorKey, KafkaFuture<TransactionDescription>> futures;
 
-    DescribeTransactionsResult(Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures) {
+    DescribeTransactionsResult(Map<CoordinatorKey, KafkaFuture<TransactionDescription>> futures) {
         this.futures = futures;
     }
 
@@ -66,7 +65,7 @@ public class DescribeTransactionsResult {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
             .thenApply(nil -> {
                 Map<String, TransactionDescription> results = new HashMap<>(futures.size());
-                for (Map.Entry<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> entry : futures.entrySet()) {
+                for (Map.Entry<CoordinatorKey, KafkaFuture<TransactionDescription>> entry : futures.entrySet()) {
                     try {
                         results.put(entry.getKey().idValue, entry.getValue().get());
                     } catch (InterruptedException | ExecutionException e) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
index 186c584..548c94c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
@@ -35,9 +35,9 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
  */
 @InterfaceStability.Evolving
 final public class ElectLeadersResult {
-    private final KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture;
+    private final KafkaFuture<Map<TopicPartition, Optional<Throwable>>> electionFuture;
 
-    ElectLeadersResult(KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture) {
+    ElectLeadersResult(KafkaFuture<Map<TopicPartition, Optional<Throwable>>> electionFuture) {
         this.electionFuture = electionFuture;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f8d9e57..09c168d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3093,7 +3093,8 @@ public class KafkaAdminClient extends AdminClient {
                 DescribeConsumerGroupsHandler.newFuture(groupIds);
         DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(options.includeAuthorizedOperations(), logContext);
         invokeDriver(handler, future, options.timeoutMs);
-        return new DescribeConsumerGroupsResult(future.all());
+        return new DescribeConsumerGroupsResult(future.all().entrySet().stream()
+                .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
     }
 
     /**
@@ -3293,7 +3294,8 @@ public class KafkaAdminClient extends AdminClient {
                 DeleteConsumerGroupsHandler.newFuture(groupIds);
         DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext);
         invokeDriver(handler, future, options.timeoutMs);
-        return new DeleteConsumerGroupsResult(future.all());
+        return new DeleteConsumerGroupsResult(future.all().entrySet().stream()
+                .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index 7732ec9..2d1c612 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -35,7 +35,7 @@ public class ListConsumerGroupsResult {
     private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
     private final KafkaFutureImpl<Collection<Throwable>> errors;
 
-    ListConsumerGroupsResult(KafkaFutureImpl<Collection<Object>> future) {
+    ListConsumerGroupsResult(KafkaFuture<Collection<Object>> future) {
         this.all = new KafkaFutureImpl<>();
         this.valid = new KafkaFutureImpl<>();
         this.errors = new KafkaFutureImpl<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
index 4b9d4ee..c9670db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
@@ -35,9 +35,9 @@ import java.util.Set;
  */
 @InterfaceStability.Evolving
 public class ListTransactionsResult {
-    private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
+    private final KafkaFuture<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
 
-    ListTransactionsResult(KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
+    ListTransactionsResult(KafkaFuture<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
         this.future = future;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
index 8ddfb83..b0294d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.admin.internals;
 
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 import java.util.Map;
@@ -76,7 +77,7 @@ public interface AdminApiFuture<K, V> {
      * This class can be used when the set of keys is known ahead of time.
      */
     class SimpleAdminApiFuture<K, V> implements AdminApiFuture<K, V> {
-        private final Map<K, KafkaFutureImpl<V>> futures;
+        private final Map<K, KafkaFuture<V>> futures;
 
         public SimpleAdminApiFuture(Set<K> keys) {
             this.futures = keys.stream().collect(Collectors.toMap(
@@ -109,7 +110,8 @@ public interface AdminApiFuture<K, V> {
         }
 
         private KafkaFutureImpl<V> futureOrThrow(K key) {
-            KafkaFutureImpl<V> future = futures.get(key);
+            // The below typecast is safe because we initialise futures using only KafkaFutureImpl.
+            KafkaFutureImpl<V> future = (KafkaFutureImpl<V>) futures.get(key);
             if (future == null) {
                 throw new IllegalArgumentException("Attempt to complete future for " + key +
                     ", which was not requested");
@@ -118,11 +120,11 @@ public interface AdminApiFuture<K, V> {
             }
         }
 
-        public Map<K, KafkaFutureImpl<V>> all() {
+        public Map<K, KafkaFuture<V>> all() {
             return futures;
         }
 
-        public KafkaFutureImpl<V> get(K key) {
+        public KafkaFuture<V> get(K key) {
             return futures.get(key);
         }
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
index 93a4fa8..6ff393f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
@@ -19,10 +19,10 @@ package org.apache.kafka.clients.admin.internals;
 import org.apache.kafka.clients.admin.internals.AdminApiDriver.RequestSpec;
 import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
 import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy.LookupResult;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -498,7 +498,7 @@ class AdminApiDriverTest {
     ) {
         OptionalInt brokerIdOpt = context.driver.keyToBrokerId(key);
         assertEquals(OptionalInt.empty(), brokerIdOpt);
-        KafkaFutureImpl<Long> future = context.future.all().get(key);
+        KafkaFuture<Long> future = context.future.all().get(key);
         assertFalse(future.isDone());
     }
 
@@ -507,7 +507,7 @@ class AdminApiDriverTest {
         String key,
         Throwable expectedException
     ) {
-        KafkaFutureImpl<Long> future = context.future.all().get(key);
+        KafkaFuture<Long> future = context.future.all().get(key);
         assertTrue(future.isCompletedExceptionally());
         Throwable exception = assertThrows(ExecutionException.class, future::get);
         assertEquals(expectedException, exception.getCause());
@@ -518,7 +518,7 @@ class AdminApiDriverTest {
         String key,
         Long expected
     ) {
-        KafkaFutureImpl<Long> future = context.future.all().get(key);
+        KafkaFuture<Long> future = context.future.all().get(key);
         assertTrue(future.isDone());
         try {
             assertEquals(expected, future.get());
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index c9b2f0b..76a3855 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -32,7 +32,6 @@ import org.mockito.Mockito._
 import org.mockito.ArgumentMatcher
 
 import scala.jdk.CollectionConverters._
-import org.apache.kafka.clients.admin.internals.CoordinatorKey
 import org.apache.kafka.common.internals.KafkaFutureImpl
 
 class ConsumerGroupServiceTest {
@@ -112,7 +111,7 @@ class ConsumerGroupServiceTest {
     val future = new KafkaFutureImpl[ConsumerGroupDescription]()
     future.complete(consumerGroupDescription)
     when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
-      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)))
+      .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, future)))
     when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
       .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
     when(admin.listOffsets(
@@ -190,7 +189,7 @@ class ConsumerGroupServiceTest {
       new Node(1, "localhost", 9092))
     val future = new KafkaFutureImpl[ConsumerGroupDescription]()
     future.complete(description)
-    new DescribeConsumerGroupsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future))
+    new DescribeConsumerGroupsResult(Collections.singletonMap(group, future))
   }
 
   private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {