You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2023/10/31 15:05:51 UTC

(kafka) branch trunk updated: MINOR: Remove ambiguous constructor (#14598)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47b468bb8ce MINOR: Remove ambiguous constructor (#14598)
47b468bb8ce is described below

commit 47b468bb8ce893e7b5d867adf0da1470fae7d638
Author: Philip Nee <pn...@confluent.io>
AuthorDate: Tue Oct 31 08:05:43 2023 -0700

    MINOR: Remove ambiguous constructor (#14598)
    
    One of the comments in https://issues.apache.org/jira/browse/KAFKA-15534 : #14532
    
    that the constructor taking a BiConsumer is rather confusing. Removing this constructor and allow the request to take a callback using whenComplete method.
    
    Reviewers: Kirk True <kt...@confluent.io>, Bruno Cadonna <ca...@apache.org>
---
 .../consumer/internals/CommitRequestManager.java     | 12 ++++--------
 .../internals/CoordinatorRequestManager.java         |  4 +---
 .../consumer/internals/FetchRequestManager.java      | 15 ++++++++-------
 .../consumer/internals/HeartbeatRequestManager.java  |  3 +--
 .../consumer/internals/NetworkClientDelegate.java    | 20 ++++++--------------
 .../consumer/internals/OffsetsRequestManager.java    |  4 ++--
 .../internals/TopicMetadataRequestManager.java       |  4 +---
 .../consumer/internals/CommitRequestManagerTest.java |  4 ++--
 .../internals/CoordinatorRequestManagerTest.java     |  2 +-
 .../internals/HeartbeatRequestManagerTest.java       |  2 +-
 10 files changed, 27 insertions(+), 43 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index d3004dd9227..b8833bfac23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -215,10 +215,8 @@ public class CommitRequestManager implements RequestManager {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
-                builder,
-                coordinatorRequestManager.coordinator(),
-                (response, throwable) -> {
+            return new NetworkClientDelegate.UnsentRequest(builder, coordinatorRequestManager.coordinator())
+                .whenComplete((response, throwable) -> {
                     if (throwable == null) {
                         future.complete(null);
                     } else {
@@ -252,10 +250,8 @@ public class CommitRequestManager implements RequestManager {
                     true,
                     new ArrayList<>(this.requestedPartitions),
                     throwOnFetchStableOffsetUnsupported);
-            return new NetworkClientDelegate.UnsentRequest(
-                builder,
-                coordinatorRequestManager.coordinator(),
-                (r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
+            return new NetworkClientDelegate.UnsentRequest(builder, coordinatorRequestManager.coordinator())
+                .whenComplete((r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
         }
 
         public void onResponse(
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 3efe45499f2..78f6b3d5e27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -115,7 +115,7 @@ public class CoordinatorRequestManager implements RequestManager {
             Optional.empty()
         );
 
-        unsentRequest.future().whenComplete((clientResponse, throwable) -> {
+        return unsentRequest.whenComplete((clientResponse, throwable) -> {
             if (clientResponse != null) {
                 FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
                 onResponse(clientResponse.receivedTimeMs(), response);
@@ -123,8 +123,6 @@ public class CoordinatorRequestManager implements RequestManager {
                 onFailedResponse(unsentRequest.handler().completionTimeMs(), throwable);
             }
         });
-
-        return unsentRequest;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
index f98d7b3d9fa..3af0c1441f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
@@ -16,12 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
@@ -31,6 +25,13 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
 /**
  * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the
  * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition
@@ -109,7 +110,7 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager
                     successHandler.handle(fetchTarget, data, clientResponse);
             };
 
-            return new UnsentRequest(request, fetchTarget, responseHandler);
+            return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler);
         }).collect(Collectors.toList());
 
         return new PollResult(requests);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 2172313054d..a0a4ca97e16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -183,14 +183,13 @@ public class HeartbeatRequestManager implements RequestManager {
         NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
             new ConsumerGroupHeartbeatRequest.Builder(data),
             coordinatorRequestManager.coordinator());
-        request.future().whenComplete((response, exception) -> {
+        return request.whenComplete((response, exception) -> {
             if (response != null) {
                 onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs());
             } else {
                 onFailure(exception, request.handler().completionTimeMs());
             }
         });
-        return request;
     }
 
     private void onFailure(final Throwable exception, final long responseTimeMs) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 47485ed46ca..83f81ed4e86 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -275,20 +275,7 @@ public class NetworkClientDelegate implements AutoCloseable {
             this.handler = new FutureCompletionHandler();
         }
 
-        public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
-                             final Optional<Node> node,
-                             final BiConsumer<ClientResponse, Throwable> callback) {
-            this(requestBuilder, node);
-            this.handler.future().whenComplete(callback);
-        }
-
-        public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
-                             final Node node,
-                             final BiConsumer<ClientResponse, Throwable> callback) {
-            this(requestBuilder, Optional.of(node), callback);
-        }
-
-        public void setTimer(final Time time, final long requestTimeoutMs) {
+        void setTimer(final Time time, final long requestTimeoutMs) {
             this.timer = time.timer(requestTimeoutMs);
         }
 
@@ -300,6 +287,11 @@ public class NetworkClientDelegate implements AutoCloseable {
             return handler;
         }
 
+        UnsentRequest whenComplete(BiConsumer<ClientResponse, Throwable> callback) {
+            handler.future().whenComplete(callback);
+            return this;
+        }
+
         AbstractRequest.Builder<?> requestBuilder() {
             return requestBuilder;
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 700e2ab6e17..78f6bdb1aa1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -356,7 +356,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
                 Optional.ofNullable(node));
         unsentRequests.add(unsentRequest);
         CompletableFuture<ListOffsetResult> result = new CompletableFuture<>();
-        unsentRequest.future().whenComplete((response, error) -> {
+        unsentRequest.whenComplete((response, error) -> {
             if (error != null) {
                 log.debug("Sending ListOffset request {} to broker {} failed",
                         builder,
@@ -505,7 +505,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
                 Optional.ofNullable(node));
         unsentRequests.add(unsentRequest);
         CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> result = new CompletableFuture<>();
-        unsentRequest.future().whenComplete((response, error) -> {
+        unsentRequest.whenComplete((response, error) -> {
             if (error != null) {
                 log.debug("Sending OffsetsForLeaderEpoch request {} to broker {} failed",
                         builder,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
index f62794280f4..8429e80805b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
@@ -152,15 +152,13 @@ public class TopicMetadataRequestManager implements RequestManager {
                 request,
                 Optional.empty());
 
-            unsent.future().whenComplete((response, exception) -> {
+            return unsent.whenComplete((response, exception) -> {
                 if (response == null) {
                     handleError(exception, unsent.handler().completionTimeMs());
                 } else {
                     handleResponse(response);
                 }
             });
-
-            return unsent;
         }
 
         private void handleError(final Throwable exception,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index bb58c9423c5..7703c525680 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -291,7 +291,7 @@ public class CommitRequestManagerTest {
         topicPartitionData.put(tp1, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
         topicPartitionData.put(tp2, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
 
-        res.unsentRequests.get(0).future().complete(buildOffsetFetchClientResponse(
+        res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(
                 res.unsentRequests.get(0),
                 topicPartitionData,
                 Errors.NONE));
@@ -329,7 +329,7 @@ public class CommitRequestManagerTest {
 
         NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
-        res.unsentRequests.get(0).future().complete(buildOffsetFetchClientResponse(res.unsentRequests.get(0),
+        res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(res.unsentRequests.get(0),
             partitions, error));
         res = commitRequestManger.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index 23ee97daf45..d7ad1b55738 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -180,7 +180,7 @@ public class CoordinatorRequestManagerTest {
         assertEquals(1, res.unsentRequests.size());
 
         NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0);
-        unsentRequest.future().complete(buildResponse(unsentRequest, error));
+        unsentRequest.handler().onComplete(buildResponse(unsentRequest, error));
 
         boolean expectCoordinatorFound = error == Errors.NONE;
         assertEquals(expectCoordinatorFound, coordinatorManager.coordinator().isPresent());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 56c0ba1d50d..d9355ce36a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -189,7 +189,7 @@ public class HeartbeatRequestManagerTest {
         when(membershipManager.shouldSendHeartbeat()).thenReturn(true);
         NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(1, result.unsentRequests.size());
-        result.unsentRequests.get(0).future().completeExceptionally(new KafkaException("fatal"));
+        result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new KafkaException("fatal"));
         verify(membershipManager).transitionToFailed();
         verify(backgroundEventHandler).add(any());
     }