You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/14 00:31:39 UTC
[kafka] 02/02: KAFKA-9842;
Add test case for OffsetsForLeaderEpoch grouping in Fetcher (#8457)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 264e2af69b69f60a5efcbd6147a98b0f60f42ffe
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Apr 13 17:20:01 2020 -0700
KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping in Fetcher (#8457)
This is a follow-up to #8077. The bug exposed a testing gap in how we group partitions. This patch adds a test case which reproduces the reported problem.
Reviewers: David Arthur <mu...@gmail.com>
---
.../kafka/clients/consumer/internals/Fetcher.java | 3 +-
.../clients/consumer/internals/FetcherTest.java | 55 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index b359a24..bcfedcd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -796,7 +796,8 @@ public class Fetcher<K, V> implements Closeable {
subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs);
- RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
+ RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
+ offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
@Override
public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 72e1370..5a88750 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -66,6 +66,7 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
@@ -3557,6 +3558,60 @@ public class FetcherTest {
}
@Test
+ public void testOffsetValidationRequestGrouping() {
+ buildFetcher();
+ assignFromUser(Utils.mkSet(tp0, tp1, tp2, tp3));
+
+ metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 3,
+ Collections.emptyMap(), singletonMap(topicName, 4),
+ tp -> 5), false, 0L);
+
+ for (TopicPartition tp : subscriptions.assignedPartitions()) {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+ metadata.currentLeader(tp).leader, Optional.of(4));
+ subscriptions.seekUnvalidated(tp,
+ new SubscriptionState.FetchPosition(0, Optional.of(4), leaderAndEpoch));
+ }
+
+ Set<TopicPartition> allRequestedPartitions = new HashSet<>();
+
+ for (Node node : metadata.fetch().nodes()) {
+ apiVersions.update(node.idString(), NodeApiVersions.create());
+
+ Set<TopicPartition> expectedPartitions = subscriptions.assignedPartitions().stream()
+ .filter(tp ->
+ metadata.currentLeader(tp).leader.equals(Optional.of(node)))
+ .collect(Collectors.toSet());
+
+ assertTrue(expectedPartitions.stream().noneMatch(allRequestedPartitions::contains));
+ assertTrue(expectedPartitions.size() > 0);
+ allRequestedPartitions.addAll(expectedPartitions);
+
+ Map<TopicPartition, EpochEndOffset> endOffsets = expectedPartitions.stream().collect(Collectors.toMap(
+ Function.identity(),
+ tp -> new EpochEndOffset(Errors.NONE, 4, 0)
+ ));
+
+ OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(endOffsets);
+ client.prepareResponseFrom(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ OffsetsForLeaderEpochRequest request = (OffsetsForLeaderEpochRequest) body;
+ return expectedPartitions.equals(request.epochsByTopicPartition().keySet());
+ }
+ }, response, node);
+ }
+
+ assertEquals(subscriptions.assignedPartitions(), allRequestedPartitions);
+
+ fetcher.validateOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+
+ assertTrue(subscriptions.assignedPartitions()
+ .stream().noneMatch(subscriptions::awaitingValidation));
+ }
+
+ @Test
public void testOffsetValidationAwaitsNodeApiVersion() {
buildFetcher();
assignFromUser(singleton(tp0));