You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/14 00:31:38 UTC
[kafka] 01/02: KAFKA-9583;
Use topic-partitions grouped by node to send OffsetsForLeaderEpoch
requests (#8077)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 3ee844c91f0a961b0b53bec59604da7f16604241
Author: Dezhi “Andy” Fang <an...@gmail.com>
AuthorDate: Wed Apr 8 23:46:45 2020 -0700
KAFKA-9583; Use topic-partitions grouped by node to send OffsetsForLeaderEpoch requests (#8077)
In `validateOffsetsAsync` in t he consumer, we group the requests by leader node for efficiency. The list of topic-partitions are grouped from `partitionsToValidate` (all partitions) to `node` => `fetchPostitions` (partitions by node). However, when actually sending the request with `OffsetsForLeaderEpochClient`, we use `partitionsToValidate`, which is the list of all topic-partitions passed into `validateOffsetsAsync`. This results in extra partitions being included in the request se [...]
This PR fixes the issue by using `fetchPositions`, which is the proper list of partitions that we should send in the request. Additionally, a small typo of API name in `OffsetsForLeaderEpochClient` is corrected (it originally referenced `LisfOffsets` as the API name).
Reviewers: David Arthur <mu...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 2 +-
.../kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3537f60..b359a24 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -796,7 +796,7 @@ public class Fetcher<K, V> implements Closeable {
subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs);
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate);
+ RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
@Override
public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 480d0ea..7e372c7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -96,7 +96,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
partitionsToRetry.add(topicPartition);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
- logger().warn("Received unknown topic or partition error in ListOffset request for partition {}",
+ logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}",
topicPartition);
partitionsToRetry.add(topicPartition);
break;