You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/14 00:31:39 UTC

[kafka] 02/02: KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping in Fetcher (#8457)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 264e2af69b69f60a5efcbd6147a98b0f60f42ffe
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Apr 13 17:20:01 2020 -0700

    KAFKA-9842; Add test case for OffsetsForLeaderEpoch grouping in Fetcher (#8457)
    
    This is a follow-up to #8077. The bug exposed a testing gap in how we group partitions. This patch adds a test case which reproduces the reported problem.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  3 +-
 .../clients/consumer/internals/FetcherTest.java    | 55 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index b359a24..bcfedcd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -796,7 +796,8 @@ public class Fetcher<K, V> implements Closeable {
 
             subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs);
 
-            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
+            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
+                offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPostitions);
             future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
                 @Override
                 public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 72e1370..5a88750 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -66,6 +66,7 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -3557,6 +3558,60 @@ public class FetcherTest {
     }
 
     @Test
+    public void testOffsetValidationRequestGrouping() {
+        buildFetcher();
+        assignFromUser(Utils.mkSet(tp0, tp1, tp2, tp3));
+
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 3,
+            Collections.emptyMap(), singletonMap(topicName, 4),
+            tp -> 5), false, 0L);
+
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+                metadata.currentLeader(tp).leader, Optional.of(4));
+            subscriptions.seekUnvalidated(tp,
+                new SubscriptionState.FetchPosition(0, Optional.of(4), leaderAndEpoch));
+        }
+
+        Set<TopicPartition> allRequestedPartitions = new HashSet<>();
+
+        for (Node node : metadata.fetch().nodes()) {
+            apiVersions.update(node.idString(), NodeApiVersions.create());
+
+            Set<TopicPartition> expectedPartitions = subscriptions.assignedPartitions().stream()
+                .filter(tp ->
+                    metadata.currentLeader(tp).leader.equals(Optional.of(node)))
+                .collect(Collectors.toSet());
+
+            assertTrue(expectedPartitions.stream().noneMatch(allRequestedPartitions::contains));
+            assertTrue(expectedPartitions.size() > 0);
+            allRequestedPartitions.addAll(expectedPartitions);
+
+            Map<TopicPartition, EpochEndOffset> endOffsets = expectedPartitions.stream().collect(Collectors.toMap(
+                Function.identity(),
+                tp -> new EpochEndOffset(Errors.NONE, 4, 0)
+            ));
+
+            OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(endOffsets);
+            client.prepareResponseFrom(new MockClient.RequestMatcher() {
+                @Override
+                public boolean matches(AbstractRequest body) {
+                    OffsetsForLeaderEpochRequest request = (OffsetsForLeaderEpochRequest) body;
+                    return expectedPartitions.equals(request.epochsByTopicPartition().keySet());
+                }
+            }, response, node);
+        }
+
+        assertEquals(subscriptions.assignedPartitions(), allRequestedPartitions);
+
+        fetcher.validateOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        assertTrue(subscriptions.assignedPartitions()
+            .stream().noneMatch(subscriptions::awaitingValidation));
+    }
+
+    @Test
     public void testOffsetValidationAwaitsNodeApiVersion() {
         buildFetcher();
         assignFromUser(singleton(tp0));