You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/27 21:25:45 UTC

[kafka] branch 2.3 updated: KAFKA-8437; Await node api versions before checking if offset validation is possible (#6823)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new cf19c63  KAFKA-8437; Await node api versions before checking if offset validation is possible (#6823)
cf19c63 is described below

commit cf19c631da89031a66084b4ad49542a7330c9fb3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon May 27 14:22:39 2019 -0700

    KAFKA-8437; Await node api versions before checking if offset validation is possible (#6823)
    
    The consumer should await api version information before determining whether the broker supports offset validation. In KAFKA-8422, we skip the validation if we don't have api version information, which means we always skip validation the first time we connect to a node. This bug was detected by the failing system test `tests/client/truncation_test.py`. The test passes again with this fix.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 15 ++++----
 .../java/org/apache/kafka/clients/MockClient.java  |  4 +++
 .../clients/consumer/internals/FetcherTest.java    | 42 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index aa2936a..59bc14c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -719,12 +719,7 @@ public class Fetcher<K, V> implements Closeable {
         }
     }
 
-    private boolean hasUsableOffsetForLeaderEpochVersion(Node node) {
-        NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
-
-        if (nodeApiVersions == null)
-            return false;
-
+    private boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
         ApiVersionsResponse.ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
         if (apiVersion == null)
             return false;
@@ -752,7 +747,13 @@ public class Fetcher<K, V> implements Closeable {
                     .stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().currentLeader));
 
-            if (!hasUsableOffsetForLeaderEpochVersion(node)) {
+            NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+            if (nodeApiVersions == null) {
+                client.tryConnect(node);
+                return;
+            }
+
+            if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                 log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " +
                                 "support the required protocol version (introduced in Kafka 2.3)",
                         cachedLeaderAndEpochs.keySet());
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 751f245..b23005e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -97,6 +97,10 @@ public class MockClient implements KafkaClient {
         this.metadataUpdater = metadataUpdater;
     }
 
+    public boolean isConnected(String idString) {
+        return connectionState(idString).state == ConnectionState.State.CONNECTED;
+    }
+
     private ConnectionState connectionState(String idString) {
         ConnectionState connectionState = connections.get(idString);
         if (connectionState == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 69036ca..754b46e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -3209,6 +3209,48 @@ public class FetcherTest {
     }
 
     @Test
+    public void testOffsetValidationAwaitsNodeApiVersion() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        final int epochOne = 1;
+
+        metadata.update(TestUtils.metadataUpdateWith("dummy", 1,
+                Collections.emptyMap(), partitionCounts, tp -> epochOne), 0L);
+
+        Node node = metadata.fetch().nodes().get(0);
+        assertFalse(client.isConnected(node.idString()));
+
+        // Seek with a position and leader+epoch
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+        subscriptions.seekAndValidate(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
+        assertFalse(client.isConnected(node.idString()));
+        assertTrue(subscriptions.awaitingValidation(tp0));
+
+        // No version information is initially available, but the node is now connected
+        fetcher.validateOffsetsIfNeeded();
+        assertTrue(subscriptions.awaitingValidation(tp0));
+        assertTrue(client.isConnected(node.idString()));
+        apiVersions.update(node.idString(), NodeApiVersions.create());
+
+        // On the next call, the OffsetForLeaderEpoch request is sent and validation completes
+        Map<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<>();
+        endOffsetMap.put(tp0, new EpochEndOffset(Errors.NONE, epochOne, 30L));
+        OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(endOffsetMap);
+        client.prepareResponseFrom(resp, node);
+
+        fetcher.validateOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        assertFalse(subscriptions.awaitingValidation(tp0));
+        assertEquals(20L, subscriptions.position(tp0).offset);
+    }
+
+    @Test
     public void testOffsetValidationSkippedForOldBroker() {
         // Old brokers may require CLUSTER permission to use the OffsetForLeaderEpoch API,
         // so we should skip offset validation and not send the request.