You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/27 21:25:45 UTC
[kafka] branch 2.3 updated: KAFKA-8437;
Await node api versions before checking if offset validation is
possible (#6823)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new cf19c63 KAFKA-8437; Await node api versions before checking if offset validation is possible (#6823)
cf19c63 is described below
commit cf19c631da89031a66084b4ad49542a7330c9fb3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon May 27 14:22:39 2019 -0700
KAFKA-8437; Await node api versions before checking if offset validation is possible (#6823)
The consumer should await api version information before determining whether the broker supports offset validation. In KAFKA-8422, we skip the validation if we don't have api version information, which means we always skip validation the first time we connect to a node. This bug was detected by the failing system test `tests/client/truncation_test.py`. The test passes again with this fix.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../kafka/clients/consumer/internals/Fetcher.java | 15 ++++----
.../java/org/apache/kafka/clients/MockClient.java | 4 +++
.../clients/consumer/internals/FetcherTest.java | 42 ++++++++++++++++++++++
3 files changed, 54 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index aa2936a..59bc14c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -719,12 +719,7 @@ public class Fetcher<K, V> implements Closeable {
}
}
- private boolean hasUsableOffsetForLeaderEpochVersion(Node node) {
- NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
-
- if (nodeApiVersions == null)
- return false;
-
+ private boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
ApiVersionsResponse.ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
if (apiVersion == null)
return false;
@@ -752,7 +747,13 @@ public class Fetcher<K, V> implements Closeable {
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().currentLeader));
- if (!hasUsableOffsetForLeaderEpochVersion(node)) {
+ NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+ if (nodeApiVersions == null) {
+ client.tryConnect(node);
+ return;
+ }
+
+ if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " +
"support the required protocol version (introduced in Kafka 2.3)",
cachedLeaderAndEpochs.keySet());
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 751f245..b23005e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -97,6 +97,10 @@ public class MockClient implements KafkaClient {
this.metadataUpdater = metadataUpdater;
}
+ public boolean isConnected(String idString) {
+ return connectionState(idString).state == ConnectionState.State.CONNECTED;
+ }
+
private ConnectionState connectionState(String idString) {
ConnectionState connectionState = connections.get(idString);
if (connectionState == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 69036ca..754b46e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -3209,6 +3209,48 @@ public class FetcherTest {
}
@Test
+ public void testOffsetValidationAwaitsNodeApiVersion() {
+ buildFetcher();
+ assignFromUser(singleton(tp0));
+
+ Map<String, Integer> partitionCounts = new HashMap<>();
+ partitionCounts.put(tp0.topic(), 4);
+
+ final int epochOne = 1;
+
+ metadata.update(TestUtils.metadataUpdateWith("dummy", 1,
+ Collections.emptyMap(), partitionCounts, tp -> epochOne), 0L);
+
+ Node node = metadata.fetch().nodes().get(0);
+ assertFalse(client.isConnected(node.idString()));
+
+ // Seek with a position and leader+epoch
+ Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+ metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+ subscriptions.seekAndValidate(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
+ assertFalse(client.isConnected(node.idString()));
+ assertTrue(subscriptions.awaitingValidation(tp0));
+
+ // No version information is initially available, but the node is now connected
+ fetcher.validateOffsetsIfNeeded();
+ assertTrue(subscriptions.awaitingValidation(tp0));
+ assertTrue(client.isConnected(node.idString()));
+ apiVersions.update(node.idString(), NodeApiVersions.create());
+
+ // On the next call, the OffsetForLeaderEpoch request is sent and validation completes
+ Map<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<>();
+ endOffsetMap.put(tp0, new EpochEndOffset(Errors.NONE, epochOne, 30L));
+ OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(endOffsetMap);
+ client.prepareResponseFrom(resp, node);
+
+ fetcher.validateOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+
+ assertFalse(subscriptions.awaitingValidation(tp0));
+ assertEquals(20L, subscriptions.position(tp0).offset);
+ }
+
+ @Test
public void testOffsetValidationSkippedForOldBroker() {
// Old brokers may require CLUSTER permission to use the OffsetForLeaderEpoch API,
// so we should skip offset validation and not send the request.