You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/21 20:51:19 UTC
[kafka] branch 2.3 updated: KAFKA-8052;
Ensure fetch session epoch is updated before new request (#6582)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 4b04f55 KAFKA-8052; Ensure fetch session epoch is updated before new request (#6582)
4b04f55 is described below
commit 4b04f5519f59264560c6265fc0bfe94eaca6ca9a
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue May 21 21:46:04 2019 +0100
KAFKA-8052; Ensure fetch session epoch is updated before new request (#6582)
Reviewers: Jason Gustafson <ja...@confluent.io>, Colin Patrick McCabe <cm...@confluent.io>, Andrew Olson <ao...@cerner.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
.../kafka/clients/consumer/internals/Fetcher.java | 103 ++++++++++++---------
.../clients/consumer/internals/FetcherTest.java | 56 +++++++++++
2 files changed, 116 insertions(+), 43 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4ea9b0b..7b633d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -111,6 +111,10 @@ import static java.util.Collections.emptyList;
* the caller.</li>
* <li>Responses that collate partial responses from multiple brokers (e.g. to list offsets) are
* synchronized on the response future.</li>
+ * <li>At most one request is pending for each node at any time. Nodes with pending requests are
+ * tracked and updated after processing the response. This ensures that any state (e.g. epoch)
+ * updated while processing responses on one thread are visible while creating the subsequent request
+ * on a different thread.</li>
* </ul>
*/
public class Fetcher<K, V> implements Closeable {
@@ -139,6 +143,7 @@ public class Fetcher<K, V> implements Closeable {
private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+ private final Set<Integer> nodesWithPendingFetchRequests;
private PartitionRecords nextInLineRecords = null;
@@ -183,6 +188,7 @@ public class Fetcher<K, V> implements Closeable {
this.isolationLevel = isolationLevel;
this.sessionHandlers = new HashMap<>();
this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+ this.nodesWithPendingFetchRequests = new HashSet<>();
}
/**
@@ -237,63 +243,73 @@ public class Fetcher<K, V> implements Closeable {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
- @SuppressWarnings("unchecked")
- FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
- FetchSessionHandler handler = sessionHandler(fetchTarget.id());
- if (handler == null) {
- log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
- fetchTarget.id());
- return;
- }
- if (!handler.handleResponse(response)) {
- return;
- }
+ try {
+ @SuppressWarnings("unchecked")
+ FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
+ FetchSessionHandler handler = sessionHandler(fetchTarget.id());
+ if (handler == null) {
+ log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
+ fetchTarget.id());
+ return;
+ }
+ if (!handler.handleResponse(response)) {
+ return;
+ }
- Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
- FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
-
- for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
- TopicPartition partition = entry.getKey();
- FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
- if (requestData == null) {
- String message;
- if (data.metadata().isFull()) {
- message = MessageFormatter.arrayFormat(
- "Response for missing full request partition: partition={}; metadata={}",
- new Object[]{partition, data.metadata()}).getMessage();
+ Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
+ FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
+
+ for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
+ TopicPartition partition = entry.getKey();
+ FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
+ if (requestData == null) {
+ String message;
+ if (data.metadata().isFull()) {
+ message = MessageFormatter.arrayFormat(
+ "Response for missing full request partition: partition={}; metadata={}",
+ new Object[]{partition, data.metadata()}).getMessage();
+ } else {
+ message = MessageFormatter.arrayFormat(
+ "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
+ new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
+ }
+
+ // Received fetch response for missing session partition
+ throw new IllegalStateException(message);
} else {
- message = MessageFormatter.arrayFormat(
- "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
- new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
- }
-
- // Received fetch response for missing session partition
- throw new IllegalStateException(message);
- } else {
- long fetchOffset = requestData.fetchOffset;
- FetchResponse.PartitionData<Records> fetchData = entry.getValue();
+ long fetchOffset = requestData.fetchOffset;
+ FetchResponse.PartitionData<Records> fetchData = entry.getValue();
- log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
- isolationLevel, fetchOffset, partition, fetchData);
- completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
+ log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
+ isolationLevel, fetchOffset, partition, fetchData);
+ completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
+ }
}
- }
- sensors.fetchLatency.record(resp.requestLatencyMs());
+ sensors.fetchLatency.record(resp.requestLatencyMs());
+ } finally {
+ nodesWithPendingFetchRequests.remove(fetchTarget.id());
+ }
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (Fetcher.this) {
- FetchSessionHandler handler = sessionHandler(fetchTarget.id());
- if (handler != null) {
- handler.handleError(e);
+ try {
+ FetchSessionHandler handler = sessionHandler(fetchTarget.id());
+ if (handler != null) {
+ handler.handleError(e);
+ }
+ } finally {
+ nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
}
}
});
+
+ this.nodesWithPendingFetchRequests.add(entry.getKey().id());
}
return fetchRequestMap.size();
}
@@ -1055,8 +1071,9 @@ public class Fetcher<K, V> implements Closeable {
// If we try to send during the reconnect blackout window, then the request is just
// going to be failed anyway before being sent, so skip the send for now
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
- } else if (client.hasPendingRequests(node)) {
- log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
+
+ } else if (this.nodesWithPendingFetchRequests.contains(node.id())) {
+ log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new fetch
FetchSessionHandler.Builder builder = fetchable.get(node);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1dfdf23..2f40ffc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2941,6 +2941,62 @@ public class FetcherTest {
}
@Test
+ public void testFetcherSessionEpochUpdate() throws Exception {
+ buildFetcher(2);
+
+ MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
+ client.updateMetadata(initialMetadataResponse);
+ assignFromUser(Collections.singleton(tp0));
+ subscriptions.seek(tp0, 0L);
+
+ AtomicInteger fetchesRemaining = new AtomicInteger(1000);
+ executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(() -> {
+ long nextOffset = 0;
+ long nextEpoch = 0;
+ while (fetchesRemaining.get() > 0) {
+ synchronized (consumerClient) {
+ if (!client.requests().isEmpty()) {
+ ClientRequest request = client.requests().peek();
+ FetchRequest fetchRequest = (FetchRequest) request.requestBuilder().build();
+ int epoch = fetchRequest.metadata().epoch();
+ assertTrue(String.format("Unexpected epoch expected %d got %d", nextEpoch, epoch), epoch == 0 || epoch == nextEpoch);
+ nextEpoch++;
+ LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseMap = new LinkedHashMap<>();
+ responseMap.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, nextOffset + 2L, nextOffset + 2,
+ 0L, null, buildRecords(nextOffset, 2, nextOffset)));
+ nextOffset += 2;
+ client.respondToRequest(request, new FetchResponse<>(Errors.NONE, responseMap, 0, 123));
+ consumerClient.poll(time.timer(0));
+ }
+ }
+ }
+ return fetchesRemaining.get();
+ });
+ long nextFetchOffset = 0;
+ while (fetchesRemaining.get() > 0 && !future.isDone()) {
+ if (fetcher.sendFetches() == 1) {
+ synchronized (consumerClient) {
+ consumerClient.poll(time.timer(0));
+ }
+ }
+ if (fetcher.hasCompletedFetches()) {
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchedRecords();
+ if (!fetchedRecords.isEmpty()) {
+ fetchesRemaining.decrementAndGet();
+ List<ConsumerRecord<byte[], byte[]>> records = fetchedRecords.get(tp0);
+ assertEquals(2, records.size());
+ assertEquals(nextFetchOffset, records.get(0).offset());
+ assertEquals(nextFetchOffset + 1, records.get(1).offset());
+ nextFetchOffset += 2;
+ }
+ assertTrue(fetchedRecords().isEmpty());
+ }
+ }
+ assertEquals(0, future.get());
+ }
+
+ @Test
public void testEmptyControlBatch() {
buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);