You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/05/27 21:24:54 UTC
[kafka] branch 2.5 updated: KAFKA-9724 Newer clients not always
sending fetch request to older brokers (#8376)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 31dd984 KAFKA-9724 Newer clients not always sending fetch request to older brokers (#8376)
31dd984 is described below
commit 31dd984cbe5bd14f52e488712b737887c691b7d6
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed May 27 17:24:17 2020 -0400
KAFKA-9724 Newer clients not always sending fetch request to older brokers (#8376)
Newer clients were getting stuck entering the validation phase even when a broker didn't support it. This commit will bypass the AWAITING_VALIDATION state when the broker is on an older version of the OffsetsForLeaderEpoch RPC.
---
.../java/org/apache/kafka/clients/Metadata.java | 55 +++++------
.../kafka/clients/consumer/internals/Fetcher.java | 9 +-
.../consumer/internals/SubscriptionState.java | 29 +++++-
.../org/apache/kafka/clients/MetadataTest.java | 73 ++++++++++-----
.../clients/consumer/internals/FetcherTest.java | 104 ++++++++++++++++++---
.../consumer/internals/SubscriptionStateTest.java | 46 ++++++++-
6 files changed, 243 insertions(+), 73 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 7116e2e..d609c2a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -41,7 +41,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
@@ -156,14 +155,35 @@ public class Metadata implements Closeable {
}
/**
- * Request an update for the partition metadata iff the given leader epoch is newer than the last seen leader epoch
+ * Request an update for the partition metadata iff we have seen a newer leader epoch. This is called by the client
+ * any time it handles a response from the broker that includes leader epoch, except for UpdateMetadata which
+ * follows a different code path ({@link #update}).
+ *
+ * @param topicPartition
+ * @param leaderEpoch
+ * @return true if we updated the last seen epoch, false otherwise
*/
public synchronized boolean updateLastSeenEpochIfNewer(TopicPartition topicPartition, int leaderEpoch) {
Objects.requireNonNull(topicPartition, "TopicPartition cannot be null");
if (leaderEpoch < 0)
throw new IllegalArgumentException("Invalid leader epoch " + leaderEpoch + " (must be non-negative)");
- boolean updated = updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch -> leaderEpoch > oldEpoch);
+ Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
+ log.trace("Determining if we should replace existing epoch {} with new epoch {} for partition {}", oldEpoch, leaderEpoch, topicPartition);
+
+ final boolean updated;
+ if (oldEpoch == null) {
+ log.debug("Not replacing null epoch with new epoch {} for partition {}", leaderEpoch, topicPartition);
+ updated = false;
+ } else if (leaderEpoch > oldEpoch) {
+ log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch, leaderEpoch, topicPartition);
+ lastSeenLeaderEpochs.put(topicPartition, leaderEpoch);
+ updated = true;
+ } else {
+ log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, leaderEpoch, topicPartition);
+ updated = false;
+ }
+
this.needFullUpdate = this.needFullUpdate || updated;
return updated;
}
@@ -173,29 +193,6 @@ public class Metadata implements Closeable {
}
/**
- * Conditionally update the leader epoch for a partition
- *
- * @param topicPartition topic+partition to update the epoch for
- * @param epoch the new epoch
- * @param epochTest a predicate to determine if the old epoch should be replaced
- * @return true if the epoch was updated, false otherwise
- */
- private synchronized boolean updateLastSeenEpoch(TopicPartition topicPartition,
- int epoch,
- Predicate<Integer> epochTest) {
- Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
- log.trace("Determining if we should replace existing epoch {} with new epoch {}", oldEpoch, epoch);
- if (oldEpoch == null || epochTest.test(oldEpoch)) {
- log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch, epoch, topicPartition);
- lastSeenLeaderEpochs.put(topicPartition, epoch);
- return true;
- } else {
- log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, epoch, topicPartition);
- return false;
- }
- }
-
- /**
* Check whether an update has been explicitly requested.
*
* @return true if an update was requested, false otherwise
@@ -373,10 +370,14 @@ public class Metadata implements Closeable {
if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
int newEpoch = partitionMetadata.leaderEpoch.get();
// If the received leader epoch is at least the same as the previous one, update the metadata
- if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch)) {
+ Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
+ if (currentEpoch == null || newEpoch >= currentEpoch) {
+ log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
+ lastSeenLeaderEpochs.put(tp, newEpoch);
return Optional.of(partitionMetadata);
} else {
// Otherwise ignore the new metadata and use the previously cached info
+ log.debug("Got metadata for an older epoch {} (current is {}) for partition {}, not updating", newEpoch, currentEpoch, tp);
return cache.partitionMetadata(tp);
}
} else {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index bcfedcd..ea1b7e7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -486,7 +486,7 @@ public class Fetcher<K, V> implements Closeable {
// Validate each partition against the current leader and epoch
subscriptions.assignedPartitions().forEach(topicPartition -> {
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
- subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch);
+ subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);
});
// Collect positions needing validation, with backoff
@@ -754,7 +754,7 @@ public class Fetcher<K, V> implements Closeable {
}
}
- private boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
+ static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
if (apiVersion == null)
return false;
@@ -1099,8 +1099,9 @@ public class Fetcher<K, V> implements Closeable {
Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
// Ensure the position has an up-to-date leader
- subscriptions.assignedPartitions().forEach(
- tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.currentLeader(tp)));
+ subscriptions.assignedPartitions().forEach(tp ->
+ subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp, metadata.currentLeader(tp))
+ );
long currentTimeMs = time.milliseconds();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6568c91..bd84777 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -45,6 +47,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collector;
import java.util.stream.Collectors;
+import static org.apache.kafka.clients.consumer.internals.Fetcher.hasUsableOffsetForLeaderEpochVersion;
+
/**
* A class for tracking the topics, partitions, and offsets for the consumer. A partition
* is "assigned" either directly with {@link #assignFromUser(Set)} (manual assignment)
@@ -422,8 +426,29 @@ public class SubscriptionState {
assignedState(tp).position(position);
}
- public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) {
- return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+ /**
+ * Enter the offset validation state if the leader for this partition is known to support a usable version of the
+ * OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation.
+ *
+ * @param apiVersions
+ * @param tp
+ * @param leaderAndEpoch
+ * @return true if we enter the offset validation state
+ */
+ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition tp,
+ Metadata.LeaderAndEpoch leaderAndEpoch) {
+ if (leaderAndEpoch.leader.isPresent()) {
+ NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString());
+ if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+ return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+ } else {
+ // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation
+ completeValidation(tp);
+ return false;
+ }
+ } else {
+ return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+ }
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5203745..96cd22c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -313,6 +313,10 @@ public class MetadataTest {
boolean[] updateResult = {true, false, false, false, false, true, false, false, false, true};
TopicPartition tp = new TopicPartition("topic", 0);
+ MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1,
+ Collections.emptyMap(), Collections.singletonMap("topic", 1), _tp -> 0);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
+
for (int i = 0; i < epochs.length; i++) {
metadata.updateLastSeenEpochIfNewer(tp, epochs[i]);
if (updateResult[i]) {
@@ -326,6 +330,46 @@ public class MetadataTest {
}
@Test
+ public void testUpdateLastEpoch() {
+ TopicPartition tp = new TopicPartition("topic-1", 0);
+
+ MetadataResponse metadataResponse = emptyMetadataResponse();
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+ // if we have no leader epoch, this call shouldn't do anything
+ assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0));
+ assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
+ assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 2));
+ assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
+
+ // Metadata with newer epoch is handled
+ metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+ // Don't update to an older one
+ assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+ // Don't cause update if it's the same one
+ assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 10));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+ // Update if we see newer epoch
+ assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 12));
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
+
+ metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
+
+ // Don't overwrite metadata with older epoch
+ metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 11);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+ assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
+ }
+
+ @Test
public void testRejectOldMetadata() {
Map<String, Integer> partitionCounts = new HashMap<>();
partitionCounts.put("topic-1", 1);
@@ -379,26 +423,6 @@ public class MetadataTest {
}
@Test
- public void testMaybeRequestUpdate() {
- TopicPartition tp = new TopicPartition("topic-1", 0);
- metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
- assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 1));
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
-
- metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 1L);
- assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
-
- metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 2L);
- assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0));
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
-
- metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 3L);
- assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 2));
- assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 2);
- }
-
- @Test
public void testOutOfBandEpochUpdate() {
Map<String, Integer> partitionCounts = new HashMap<>();
partitionCounts.put("topic-1", 5);
@@ -406,7 +430,7 @@ public class MetadataTest {
metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
- assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 99));
+ assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 99));
// Update epoch to 100
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100);
@@ -414,7 +438,7 @@ public class MetadataTest {
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
- // Simulate a leader epoch from another response, like a fetch response (not yet implemented)
+ // Simulate a leader epoch from another response, like a fetch response or list offsets
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101));
// Cache of partition stays, but current partition info is not available since it's stale
@@ -454,6 +478,11 @@ public class MetadataTest {
assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(0, metadata.partitionMetadataIfCurrent(tp).get().partition());
assertEquals(Optional.of(0), metadata.partitionMetadataIfCurrent(tp).get().leaderId);
+
+ // Since epoch was null, this shouldn't update it
+ metadata.updateLastSeenEpochIfNewer(tp, 10);
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ assertFalse(metadata.partitionMetadataIfCurrent(tp).get().leaderEpoch.isPresent());
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5a88750..d85df27 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1595,6 +1595,53 @@ public class FetcherTest {
}
@Test
+ public void testListOffsetNoUpdateMissingEpoch() {
+ buildFetcher();
+
+ // Set up metadata with no leader epoch
+ subscriptions.assignFromUser(singleton(tp0));
+ MetadataResponse metadataWithNoLeaderEpochs = TestUtils.metadataUpdateWith(
+ "kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp -> null);
+ client.updateMetadata(metadataWithNoLeaderEpochs);
+
+ // Return a ListOffsets response with leaderEpoch=1, we should ignore it
+ subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+ client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+ listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 1));
+ fetcher.resetOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+
+ // Reset should be satisfied and no metadata update requested
+ assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+ assertFalse(metadata.updateRequested());
+ assertFalse(metadata.lastSeenLeaderEpoch(tp0).isPresent());
+ }
+
+ @Test
+ public void testListOffsetUpdateEpoch() {
+ buildFetcher();
+
+ // Set up metadata with leaderEpoch=1
+ subscriptions.assignFromUser(singleton(tp0));
+ MetadataResponse metadataWithLeaderEpochs = TestUtils.metadataUpdateWith(
+ "kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp -> 1);
+ client.updateMetadata(metadataWithLeaderEpochs);
+
+ // Reset offsets to trigger ListOffsets call
+ subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+ // Now we see a ListOffsets with leaderEpoch=2 epoch, we trigger a metadata update
+ client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, 1),
+ listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 2));
+ fetcher.resetOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+
+ assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+ assertTrue(metadata.updateRequested());
+ assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals((long) epoch, 2));
+ }
+
+ @Test
public void testUpdateFetchPositionDisconnect() {
buildFetcher();
assignFromUser(singleton(tp0));
@@ -3676,18 +3723,35 @@ public class FetcherTest {
apiVersions.update(node.idString(), NodeApiVersions.create(
ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
- // Seek with a position and leader+epoch
- Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
- metadata.currentLeader(tp0).leader, Optional.of(epochOne));
- subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
+ {
+ // Seek with a position and leader+epoch
+ Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+ metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+ subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
- // Update metadata to epoch=2, enter validation
- metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
- Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
- fetcher.validateOffsetsIfNeeded();
+ // Update metadata to epoch=2, enter validation
+ metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+ Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
+ fetcher.validateOffsetsIfNeeded();
- // Offset validation is skipped
- assertFalse(subscriptions.awaitingValidation(tp0));
+ // Offset validation is skipped
+ assertFalse(subscriptions.awaitingValidation(tp0));
+ }
+
+ {
+ // Seek with a position and leader+epoch
+ Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+ metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+ subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
+
+ // Update metadata to epoch=2, enter validation
+ metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+ Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
+
+ // Subscription should not stay in AWAITING_VALIDATION in prepareFetchRequest
+ assertEquals(1, fetcher.sendFetches());
+ assertFalse(subscriptions.awaitingValidation(tp0));
+ }
}
@Test
@@ -3767,7 +3831,7 @@ public class FetcherTest {
Optional.of(epochTwo),
new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochTwo)));
subscriptions.position(tp0, nextPosition);
- subscriptions.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree)));
+ subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree)));
// Prepare offset list response from async validation with epoch=2
Map<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<>();
@@ -4044,13 +4108,27 @@ public class FetcherTest {
};
}
+ private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, final int leaderEpoch) {
+ // matches any list offset request with the provided timestamp
+ return body -> {
+ ListOffsetRequest req = (ListOffsetRequest) body;
+ return req.partitionTimestamps().equals(Collections.singletonMap(
+ tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.of(leaderEpoch))));
+ };
+ }
+
private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
return listOffsetResponse(tp0, error, timestamp, offset);
}
private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
- ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, offset,
- Optional.empty());
+ return listOffsetResponse(tp, error, timestamp, offset, null);
+ }
+
+ private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset,
+ Integer leaderEpoch) {
+ ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(
+ error, timestamp, offset, Optional.ofNullable(leaderEpoch));
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp, partitionData);
return new ListOffsetResponse(allPartitionData);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 96f08f5..cc74652 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -16,12 +16,15 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
@@ -382,13 +385,15 @@ public class SubscriptionStateTest {
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
+ ApiVersions apiVersions = new ApiVersions();
+ apiVersions.update(broker1.idString(), NodeApiVersions.create());
- assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+ assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.empty())));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
- assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+ assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(10))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
@@ -414,6 +419,9 @@ public class SubscriptionStateTest {
@Test
public void testSeekUnvalidatedWithOffsetEpoch() {
Node broker1 = new Node(1, "localhost", 9092);
+ ApiVersions apiVersions = new ApiVersions();
+ apiVersions.update(broker1.idString(), NodeApiVersions.create());
+
state.assignFromUser(Collections.singleton(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2),
@@ -422,19 +430,19 @@ public class SubscriptionStateTest {
assertTrue(state.awaitingValidation(tp0));
// Update using the current leader and epoch
- assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+ assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(5))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
// Update with a newer leader and epoch
- assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+ assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(15))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
// If the updated leader has no epoch information, then skip validation and begin fetching
- assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
+ assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.empty())));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
@@ -511,6 +519,34 @@ public class SubscriptionStateTest {
}
@Test
+ public void testMaybeValidatePositionForCurrentLeader() {
+ NodeApiVersions oldApis = NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2);
+ ApiVersions apiVersions = new ApiVersions();
+ apiVersions.update("1", oldApis);
+
+ Node broker1 = new Node(1, "localhost", 9092);
+ state.assignFromUser(Collections.singleton(tp0));
+
+ state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
+ new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
+
+ // if API is too old to be usable, we just skip validation
+ assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.of(10))));
+ assertTrue(state.hasValidPosition(tp0));
+
+ // New API
+ apiVersions.update("1", NodeApiVersions.create());
+ state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
+ new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
+
+ // API is too old to be usable, we just skip validation
+ assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
+ Optional.of(broker1), Optional.of(10))));
+ assertFalse(state.hasValidPosition(tp0));
+ }
+
+ @Test
public void testMaybeCompleteValidationAfterPositionChange() {
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));