You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/14 00:31:38 UTC

[kafka] 01/02: KAFKA-9583; Use topic-partitions grouped by node to send OffsetsForLeaderEpoch requests (#8077)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 3ee844c91f0a961b0b53bec59604da7f16604241
Author: Dezhi “Andy” Fang <an...@gmail.com>
AuthorDate: Wed Apr 8 23:46:45 2020 -0700

    KAFKA-9583; Use topic-partitions grouped by node to send OffsetsForLeaderEpoch requests (#8077)
    
    In `validateOffsetsAsync` in t he consumer, we group the requests by leader node for efficiency. The list of topic-partitions are grouped from `partitionsToValidate` (all partitions) to `node` => `fetchPostitions` (partitions by node). However, when actually sending the request with `OffsetsForLeaderEpochClient`, we use `partitionsToValidate`, which is the list of all topic-partitions passed into `validateOffsetsAsync`. This results in extra partitions being included in the request se [...]
    
    This PR fixes the issue by using `fetchPositions`, which is the proper list of partitions that we should send in the request. Additionally, a small typo of API name in `OffsetsForLeaderEpochClient` is corrected (it originally referenced `LisfOffsets` as the API name).
    
    Reviewers: David Arthur <mu...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java  | 2 +-
 .../kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3537f60..b359a24 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -796,7 +796,7 @@ public class Fetcher<K, V> implements Closeable {
 
             subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs);
 
-            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate);
+            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
             future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
                 @Override
                 public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 480d0ea..7e372c7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -96,7 +96,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
                     partitionsToRetry.add(topicPartition);
                     break;
                 case UNKNOWN_TOPIC_OR_PARTITION:
-                    logger().warn("Received unknown topic or partition error in ListOffset request for partition {}",
+                    logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}",
                             topicPartition);
                     partitionsToRetry.add(topicPartition);
                     break;