You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/09/08 19:21:11 UTC
[kafka] branch 3.0 updated: KAFKA-13276: Prefer KafkaFuture in
admin Result constructors (#11301)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new c24b804 KAFKA-13276: Prefer KafkaFuture in admin Result constructors (#11301)
c24b804 is described below
commit c24b804fcb9552ec9f3992bfaa52c11657656018
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Wed Sep 8 20:15:44 2021 +0100
KAFKA-13276: Prefer KafkaFuture in admin Result constructors (#11301)
Avoid using the non-public API KafkaFutureImpl in the Admin client's `*Result` class constructors.
This is particularly problematic for `DescribeConsumerGroupsResult` which currently has a
public constructor. For the other classes the rationale is simply consistency with the majority of
the `*Result` classes.
Reviewers: Ismael Juma <ismael@juma.me.uk, David Jacot <dj...@confluent.io>, Luke Chen <sh...@gmail.com>
---
.../org/apache/kafka/clients/admin/AbortTransactionResult.java | 5 ++---
.../apache/kafka/clients/admin/DeleteConsumerGroupsResult.java | 8 +++-----
.../kafka/clients/admin/DescribeConsumerGroupsResult.java | 10 ++++------
.../apache/kafka/clients/admin/DescribeProducersResult.java | 7 +++----
.../apache/kafka/clients/admin/DescribeTransactionsResult.java | 7 +++----
.../org/apache/kafka/clients/admin/ElectLeadersResult.java | 4 ++--
.../java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 ++++--
.../apache/kafka/clients/admin/ListConsumerGroupsResult.java | 2 +-
.../org/apache/kafka/clients/admin/ListTransactionsResult.java | 4 ++--
.../apache/kafka/clients/admin/internals/AdminApiFuture.java | 10 ++++++----
.../kafka/clients/admin/internals/AdminApiDriverTest.java | 8 ++++----
.../test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala | 5 ++---
12 files changed, 36 insertions(+), 40 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
index 451595b..30cbcee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Map;
@@ -30,9 +29,9 @@ import java.util.Map;
*/
@InterfaceStability.Evolving
public class AbortTransactionResult {
- private final Map<TopicPartition, KafkaFutureImpl<Void>> futures;
+ private final Map<TopicPartition, KafkaFuture<Void>> futures;
- AbortTransactionResult(Map<TopicPartition, KafkaFutureImpl<Void>> futures) {
+ AbortTransactionResult(Map<TopicPartition, KafkaFuture<Void>> futures) {
this.futures = futures;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index bf4c45c..90ddbd0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.clients.admin;
-import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Collection;
import java.util.HashMap;
@@ -32,9 +30,9 @@ import java.util.Map;
*/
@InterfaceStability.Evolving
public class DeleteConsumerGroupsResult {
- private final Map<CoordinatorKey, KafkaFutureImpl<Void>> futures;
+ private final Map<String, KafkaFuture<Void>> futures;
- DeleteConsumerGroupsResult(final Map<CoordinatorKey, KafkaFutureImpl<Void>> futures) {
+ DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
this.futures = futures;
}
@@ -44,7 +42,7 @@ public class DeleteConsumerGroupsResult {
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
Map<String, KafkaFuture<Void>> deletedGroups = new HashMap<>(futures.size());
- futures.forEach((key, future) -> deletedGroups.put(key.idValue, future));
+ futures.forEach((key, future) -> deletedGroups.put(key, future));
return deletedGroups;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index a6881c4..8940060 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -17,10 +17,8 @@
package org.apache.kafka.clients.admin;
-import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.HashMap;
import java.util.Map;
@@ -35,9 +33,9 @@ import java.util.concurrent.ExecutionException;
@InterfaceStability.Evolving
public class DescribeConsumerGroupsResult {
- private final Map<CoordinatorKey, KafkaFutureImpl<ConsumerGroupDescription>> futures;
+ private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
- public DescribeConsumerGroupsResult(final Map<CoordinatorKey, KafkaFutureImpl<ConsumerGroupDescription>> futures) {
+ public DescribeConsumerGroupsResult(final Map<String, KafkaFuture<ConsumerGroupDescription>> futures) {
this.futures = futures;
}
@@ -46,7 +44,7 @@ public class DescribeConsumerGroupsResult {
*/
public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups() {
Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = new HashMap<>();
- futures.forEach((key, future) -> describedGroups.put(key.idValue, future));
+ futures.forEach((key, future) -> describedGroups.put(key, future));
return describedGroups;
}
@@ -59,7 +57,7 @@ public class DescribeConsumerGroupsResult {
Map<String, ConsumerGroupDescription> descriptions = new HashMap<>(futures.size());
futures.forEach((key, future) -> {
try {
- descriptions.put(key.idValue, future.get());
+ descriptions.put(key, future.get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, since the KafkaFuture#allOf already ensured
// that all of the futures completed successfully.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java
index 55897bf..13977c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersResult.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.HashMap;
import java.util.List;
@@ -30,9 +29,9 @@ import java.util.concurrent.ExecutionException;
@InterfaceStability.Evolving
public class DescribeProducersResult {
- private final Map<TopicPartition, KafkaFutureImpl<PartitionProducerState>> futures;
+ private final Map<TopicPartition, KafkaFuture<PartitionProducerState>> futures;
- DescribeProducersResult(Map<TopicPartition, KafkaFutureImpl<PartitionProducerState>> futures) {
+ DescribeProducersResult(Map<TopicPartition, KafkaFuture<PartitionProducerState>> futures) {
this.futures = futures;
}
@@ -49,7 +48,7 @@ public class DescribeProducersResult {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
.thenApply(nil -> {
Map<TopicPartition, PartitionProducerState> results = new HashMap<>(futures.size());
- for (Map.Entry<TopicPartition, KafkaFutureImpl<PartitionProducerState>> entry : futures.entrySet()) {
+ for (Map.Entry<TopicPartition, KafkaFuture<PartitionProducerState>> entry : futures.entrySet()) {
try {
results.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
index edacc9f..278a254 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Collection;
import java.util.HashMap;
@@ -28,9 +27,9 @@ import java.util.concurrent.ExecutionException;
@InterfaceStability.Evolving
public class DescribeTransactionsResult {
- private final Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures;
+ private final Map<CoordinatorKey, KafkaFuture<TransactionDescription>> futures;
- DescribeTransactionsResult(Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures) {
+ DescribeTransactionsResult(Map<CoordinatorKey, KafkaFuture<TransactionDescription>> futures) {
this.futures = futures;
}
@@ -66,7 +65,7 @@ public class DescribeTransactionsResult {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
.thenApply(nil -> {
Map<String, TransactionDescription> results = new HashMap<>(futures.size());
- for (Map.Entry<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> entry : futures.entrySet()) {
+ for (Map.Entry<CoordinatorKey, KafkaFuture<TransactionDescription>> entry : futures.entrySet()) {
try {
results.put(entry.getKey().idValue, entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
index 186c584..548c94c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
@@ -35,9 +35,9 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
*/
@InterfaceStability.Evolving
final public class ElectLeadersResult {
- private final KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture;
+ private final KafkaFuture<Map<TopicPartition, Optional<Throwable>>> electionFuture;
- ElectLeadersResult(KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture) {
+ ElectLeadersResult(KafkaFuture<Map<TopicPartition, Optional<Throwable>>> electionFuture) {
this.electionFuture = electionFuture;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f8d9e57..09c168d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3093,7 +3093,8 @@ public class KafkaAdminClient extends AdminClient {
DescribeConsumerGroupsHandler.newFuture(groupIds);
DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(options.includeAuthorizedOperations(), logContext);
invokeDriver(handler, future, options.timeoutMs);
- return new DescribeConsumerGroupsResult(future.all());
+ return new DescribeConsumerGroupsResult(future.all().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
/**
@@ -3293,7 +3294,8 @@ public class KafkaAdminClient extends AdminClient {
DeleteConsumerGroupsHandler.newFuture(groupIds);
DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext);
invokeDriver(handler, future, options.timeoutMs);
- return new DeleteConsumerGroupsResult(future.all());
+ return new DeleteConsumerGroupsResult(future.all().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index 7732ec9..2d1c612 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -35,7 +35,7 @@ public class ListConsumerGroupsResult {
private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
private final KafkaFutureImpl<Collection<Throwable>> errors;
- ListConsumerGroupsResult(KafkaFutureImpl<Collection<Object>> future) {
+ ListConsumerGroupsResult(KafkaFuture<Collection<Object>> future) {
this.all = new KafkaFutureImpl<>();
this.valid = new KafkaFutureImpl<>();
this.errors = new KafkaFutureImpl<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
index 4b9d4ee..c9670db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
@@ -35,9 +35,9 @@ import java.util.Set;
*/
@InterfaceStability.Evolving
public class ListTransactionsResult {
- private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
+ private final KafkaFuture<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
- ListTransactionsResult(KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
+ ListTransactionsResult(KafkaFuture<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
this.future = future;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
index 8ddfb83..b0294d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Map;
@@ -76,7 +77,7 @@ public interface AdminApiFuture<K, V> {
* This class can be used when the set of keys is known ahead of time.
*/
class SimpleAdminApiFuture<K, V> implements AdminApiFuture<K, V> {
- private final Map<K, KafkaFutureImpl<V>> futures;
+ private final Map<K, KafkaFuture<V>> futures;
public SimpleAdminApiFuture(Set<K> keys) {
this.futures = keys.stream().collect(Collectors.toMap(
@@ -109,7 +110,8 @@ public interface AdminApiFuture<K, V> {
}
private KafkaFutureImpl<V> futureOrThrow(K key) {
- KafkaFutureImpl<V> future = futures.get(key);
+ // The below typecast is safe because we initialise futures using only KafkaFutureImpl.
+ KafkaFutureImpl<V> future = (KafkaFutureImpl<V>) futures.get(key);
if (future == null) {
throw new IllegalArgumentException("Attempt to complete future for " + key +
", which was not requested");
@@ -118,11 +120,11 @@ public interface AdminApiFuture<K, V> {
}
}
- public Map<K, KafkaFutureImpl<V>> all() {
+ public Map<K, KafkaFuture<V>> all() {
return futures;
}
- public KafkaFutureImpl<V> get(K key) {
+ public KafkaFuture<V> get(K key) {
return futures.get(key);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
index 93a4fa8..6ff393f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
@@ -19,10 +19,10 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.internals.AdminApiDriver.RequestSpec;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy.LookupResult;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -498,7 +498,7 @@ class AdminApiDriverTest {
) {
OptionalInt brokerIdOpt = context.driver.keyToBrokerId(key);
assertEquals(OptionalInt.empty(), brokerIdOpt);
- KafkaFutureImpl<Long> future = context.future.all().get(key);
+ KafkaFuture<Long> future = context.future.all().get(key);
assertFalse(future.isDone());
}
@@ -507,7 +507,7 @@ class AdminApiDriverTest {
String key,
Throwable expectedException
) {
- KafkaFutureImpl<Long> future = context.future.all().get(key);
+ KafkaFuture<Long> future = context.future.all().get(key);
assertTrue(future.isCompletedExceptionally());
Throwable exception = assertThrows(ExecutionException.class, future::get);
assertEquals(expectedException, exception.getCause());
@@ -518,7 +518,7 @@ class AdminApiDriverTest {
String key,
Long expected
) {
- KafkaFutureImpl<Long> future = context.future.all().get(key);
+ KafkaFuture<Long> future = context.future.all().get(key);
assertTrue(future.isDone());
try {
assertEquals(expected, future.get());
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index c9b2f0b..76a3855 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -32,7 +32,6 @@ import org.mockito.Mockito._
import org.mockito.ArgumentMatcher
import scala.jdk.CollectionConverters._
-import org.apache.kafka.clients.admin.internals.CoordinatorKey
import org.apache.kafka.common.internals.KafkaFutureImpl
class ConsumerGroupServiceTest {
@@ -112,7 +111,7 @@ class ConsumerGroupServiceTest {
val future = new KafkaFutureImpl[ConsumerGroupDescription]()
future.complete(consumerGroupDescription)
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any()))
- .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)))
+ .thenReturn(new DescribeConsumerGroupsResult(Collections.singletonMap(group, future)))
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
when(admin.listOffsets(
@@ -190,7 +189,7 @@ class ConsumerGroupServiceTest {
new Node(1, "localhost", 9092))
val future = new KafkaFutureImpl[ConsumerGroupDescription]()
future.complete(description)
- new DescribeConsumerGroupsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future))
+ new DescribeConsumerGroupsResult(Collections.singletonMap(group, future))
}
private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {