You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2023/10/31 15:05:51 UTC
(kafka) branch trunk updated: MINOR: Remove ambiguous constructor (#14598)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 47b468bb8ce MINOR: Remove ambiguous constructor (#14598)
47b468bb8ce is described below
commit 47b468bb8ce893e7b5d867adf0da1470fae7d638
Author: Philip Nee <pn...@confluent.io>
AuthorDate: Tue Oct 31 08:05:43 2023 -0700
MINOR: Remove ambiguous constructor (#14598)
One of the comments in https://issues.apache.org/jira/browse/KAFKA-15534 : #14532
that the constructor taking a BiConsumer is rather confusing. Removing this constructor and allow the request to take a callback using whenComplete method.
Reviewers: Kirk True <kt...@confluent.io>, Bruno Cadonna <ca...@apache.org>
---
.../consumer/internals/CommitRequestManager.java | 12 ++++--------
.../internals/CoordinatorRequestManager.java | 4 +---
.../consumer/internals/FetchRequestManager.java | 15 ++++++++-------
.../consumer/internals/HeartbeatRequestManager.java | 3 +--
.../consumer/internals/NetworkClientDelegate.java | 20 ++++++--------------
.../consumer/internals/OffsetsRequestManager.java | 4 ++--
.../internals/TopicMetadataRequestManager.java | 4 +---
.../consumer/internals/CommitRequestManagerTest.java | 4 ++--
.../internals/CoordinatorRequestManagerTest.java | 2 +-
.../internals/HeartbeatRequestManagerTest.java | 2 +-
10 files changed, 27 insertions(+), 43 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index d3004dd9227..b8833bfac23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -215,10 +215,8 @@ public class CommitRequestManager implements RequestManager {
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values())));
- return new NetworkClientDelegate.UnsentRequest(
- builder,
- coordinatorRequestManager.coordinator(),
- (response, throwable) -> {
+ return new NetworkClientDelegate.UnsentRequest(builder, coordinatorRequestManager.coordinator())
+ .whenComplete((response, throwable) -> {
if (throwable == null) {
future.complete(null);
} else {
@@ -252,10 +250,8 @@ public class CommitRequestManager implements RequestManager {
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
- return new NetworkClientDelegate.UnsentRequest(
- builder,
- coordinatorRequestManager.coordinator(),
- (r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
+ return new NetworkClientDelegate.UnsentRequest(builder, coordinatorRequestManager.coordinator())
+ .whenComplete((r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
}
public void onResponse(
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 3efe45499f2..78f6b3d5e27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -115,7 +115,7 @@ public class CoordinatorRequestManager implements RequestManager {
Optional.empty()
);
- unsentRequest.future().whenComplete((clientResponse, throwable) -> {
+ return unsentRequest.whenComplete((clientResponse, throwable) -> {
if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
onResponse(clientResponse.receivedTimeMs(), response);
@@ -123,8 +123,6 @@ public class CoordinatorRequestManager implements RequestManager {
onFailedResponse(unsentRequest.handler().completionTimeMs(), throwable);
}
});
-
- return unsentRequest;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
index f98d7b3d9fa..3af0c1441f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
@@ -16,12 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
@@ -31,6 +25,13 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
/**
* {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the
* {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition
@@ -109,7 +110,7 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager
successHandler.handle(fetchTarget, data, clientResponse);
};
- return new UnsentRequest(request, fetchTarget, responseHandler);
+ return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler);
}).collect(Collectors.toList());
return new PollResult(requests);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 2172313054d..a0a4ca97e16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -183,14 +183,13 @@ public class HeartbeatRequestManager implements RequestManager {
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new ConsumerGroupHeartbeatRequest.Builder(data),
coordinatorRequestManager.coordinator());
- request.future().whenComplete((response, exception) -> {
+ return request.whenComplete((response, exception) -> {
if (response != null) {
onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs());
} else {
onFailure(exception, request.handler().completionTimeMs());
}
});
- return request;
}
private void onFailure(final Throwable exception, final long responseTimeMs) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 47485ed46ca..83f81ed4e86 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -275,20 +275,7 @@ public class NetworkClientDelegate implements AutoCloseable {
this.handler = new FutureCompletionHandler();
}
- public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
- final Optional<Node> node,
- final BiConsumer<ClientResponse, Throwable> callback) {
- this(requestBuilder, node);
- this.handler.future().whenComplete(callback);
- }
-
- public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
- final Node node,
- final BiConsumer<ClientResponse, Throwable> callback) {
- this(requestBuilder, Optional.of(node), callback);
- }
-
- public void setTimer(final Time time, final long requestTimeoutMs) {
+ void setTimer(final Time time, final long requestTimeoutMs) {
this.timer = time.timer(requestTimeoutMs);
}
@@ -300,6 +287,11 @@ public class NetworkClientDelegate implements AutoCloseable {
return handler;
}
+ UnsentRequest whenComplete(BiConsumer<ClientResponse, Throwable> callback) {
+ handler.future().whenComplete(callback);
+ return this;
+ }
+
AbstractRequest.Builder<?> requestBuilder() {
return requestBuilder;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 700e2ab6e17..78f6bdb1aa1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -356,7 +356,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
Optional.ofNullable(node));
unsentRequests.add(unsentRequest);
CompletableFuture<ListOffsetResult> result = new CompletableFuture<>();
- unsentRequest.future().whenComplete((response, error) -> {
+ unsentRequest.whenComplete((response, error) -> {
if (error != null) {
log.debug("Sending ListOffset request {} to broker {} failed",
builder,
@@ -505,7 +505,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
Optional.ofNullable(node));
unsentRequests.add(unsentRequest);
CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> result = new CompletableFuture<>();
- unsentRequest.future().whenComplete((response, error) -> {
+ unsentRequest.whenComplete((response, error) -> {
if (error != null) {
log.debug("Sending OffsetsForLeaderEpoch request {} to broker {} failed",
builder,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
index f62794280f4..8429e80805b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
@@ -152,15 +152,13 @@ public class TopicMetadataRequestManager implements RequestManager {
request,
Optional.empty());
- unsent.future().whenComplete((response, exception) -> {
+ return unsent.whenComplete((response, exception) -> {
if (response == null) {
handleError(exception, unsent.handler().completionTimeMs());
} else {
handleResponse(response);
}
});
-
- return unsent;
}
private void handleError(final Throwable exception,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index bb58c9423c5..7703c525680 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -291,7 +291,7 @@ public class CommitRequestManagerTest {
topicPartitionData.put(tp1, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
topicPartitionData.put(tp2, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
- res.unsentRequests.get(0).future().complete(buildOffsetFetchClientResponse(
+ res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(
res.unsentRequests.get(0),
topicPartitionData,
Errors.NONE));
@@ -329,7 +329,7 @@ public class CommitRequestManagerTest {
NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
- res.unsentRequests.get(0).future().complete(buildOffsetFetchClientResponse(res.unsentRequests.get(0),
+ res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(res.unsentRequests.get(0),
partitions, error));
res = commitRequestManger.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index 23ee97daf45..d7ad1b55738 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -180,7 +180,7 @@ public class CoordinatorRequestManagerTest {
assertEquals(1, res.unsentRequests.size());
NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0);
- unsentRequest.future().complete(buildResponse(unsentRequest, error));
+ unsentRequest.handler().onComplete(buildResponse(unsentRequest, error));
boolean expectCoordinatorFound = error == Errors.NONE;
assertEquals(expectCoordinatorFound, coordinatorManager.coordinator().isPresent());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 56c0ba1d50d..d9355ce36a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -189,7 +189,7 @@ public class HeartbeatRequestManagerTest {
when(membershipManager.shouldSendHeartbeat()).thenReturn(true);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
- result.unsentRequests.get(0).future().completeExceptionally(new KafkaException("fatal"));
+ result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new KafkaException("fatal"));
verify(membershipManager).transitionToFailed();
verify(backgroundEventHandler).add(any());
}