You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/21 20:46:37 UTC

[kafka] branch trunk updated: KAFKA-8052; Ensure fetch session epoch is updated before new request (#6582)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 012880d  KAFKA-8052; Ensure fetch session epoch is updated before new request (#6582)
012880d is described below

commit 012880d4246fedb5c1ea7621e86217c57fd217e2
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue May 21 21:46:04 2019 +0100

    KAFKA-8052; Ensure fetch session epoch is updated before new request (#6582)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Colin Patrick McCabe <cm...@confluent.io>, Andrew Olson <ao...@cerner.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 103 ++++++++++++---------
 .../clients/consumer/internals/FetcherTest.java    |  56 +++++++++++
 2 files changed, 116 insertions(+), 43 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4ea9b0b..7b633d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -111,6 +111,10 @@ import static java.util.Collections.emptyList;
  *     the caller.</li>
  *     <li>Responses that collate partial responses from multiple brokers (e.g. to list offsets) are
  *     synchronized on the response future.</li>
+ *     <li>At most one request is pending for each node at any time. Nodes with pending requests are
+ *     tracked and updated after processing the response. This ensures that any state (e.g. epoch)
+ *     updated while processing responses on one thread are visible while creating the subsequent request
+ *     on a different thread.</li>
  * </ul>
  */
 public class Fetcher<K, V> implements Closeable {
@@ -139,6 +143,7 @@ public class Fetcher<K, V> implements Closeable {
     private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
     private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
     private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final Set<Integer> nodesWithPendingFetchRequests;
 
     private PartitionRecords nextInLineRecords = null;
 
@@ -183,6 +188,7 @@ public class Fetcher<K, V> implements Closeable {
         this.isolationLevel = isolationLevel;
         this.sessionHandlers = new HashMap<>();
         this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+        this.nodesWithPendingFetchRequests = new HashSet<>();
     }
 
     /**
@@ -237,63 +243,73 @@ public class Fetcher<K, V> implements Closeable {
                         @Override
                         public void onSuccess(ClientResponse resp) {
                             synchronized (Fetcher.this) {
-                                @SuppressWarnings("unchecked")
-                                FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
-                                FetchSessionHandler handler = sessionHandler(fetchTarget.id());
-                                if (handler == null) {
-                                    log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
-                                            fetchTarget.id());
-                                    return;
-                                }
-                                if (!handler.handleResponse(response)) {
-                                    return;
-                                }
+                                try {
+                                    @SuppressWarnings("unchecked")
+                                    FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
+                                    FetchSessionHandler handler = sessionHandler(fetchTarget.id());
+                                    if (handler == null) {
+                                        log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
+                                                fetchTarget.id());
+                                        return;
+                                    }
+                                    if (!handler.handleResponse(response)) {
+                                        return;
+                                    }
 
-                                Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
-                                FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
-
-                                for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
-                                    TopicPartition partition = entry.getKey();
-                                    FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
-                                    if (requestData == null) {
-                                        String message;
-                                        if (data.metadata().isFull()) {
-                                            message = MessageFormatter.arrayFormat(
-                                                    "Response for missing full request partition: partition={}; metadata={}",
-                                                    new Object[]{partition, data.metadata()}).getMessage();
+                                    Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
+                                    FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
+
+                                    for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
+                                        TopicPartition partition = entry.getKey();
+                                        FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
+                                        if (requestData == null) {
+                                            String message;
+                                            if (data.metadata().isFull()) {
+                                                message = MessageFormatter.arrayFormat(
+                                                        "Response for missing full request partition: partition={}; metadata={}",
+                                                        new Object[]{partition, data.metadata()}).getMessage();
+                                            } else {
+                                                message = MessageFormatter.arrayFormat(
+                                                        "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
+                                                        new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
+                                            }
+
+                                            // Received fetch response for missing session partition
+                                            throw new IllegalStateException(message);
                                         } else {
-                                            message = MessageFormatter.arrayFormat(
-                                                    "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
-                                                    new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
-                                        }
-
-                                        // Received fetch response for missing session partition
-                                        throw new IllegalStateException(message);
-                                    } else {
-                                        long fetchOffset = requestData.fetchOffset;
-                                        FetchResponse.PartitionData<Records> fetchData = entry.getValue();
+                                            long fetchOffset = requestData.fetchOffset;
+                                            FetchResponse.PartitionData<Records> fetchData = entry.getValue();
 
-                                        log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
-                                                isolationLevel, fetchOffset, partition, fetchData);
-                                        completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
+                                            log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
+                                                    isolationLevel, fetchOffset, partition, fetchData);
+                                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                                     resp.requestHeader().apiVersion()));
+                                        }
                                     }
-                                }
 
-                                sensors.fetchLatency.record(resp.requestLatencyMs());
+                                    sensors.fetchLatency.record(resp.requestLatencyMs());
+                                } finally {
+                                    nodesWithPendingFetchRequests.remove(fetchTarget.id());
+                                }
                             }
                         }
 
                         @Override
                         public void onFailure(RuntimeException e) {
                             synchronized (Fetcher.this) {
-                                FetchSessionHandler handler = sessionHandler(fetchTarget.id());
-                                if (handler != null) {
-                                    handler.handleError(e);
+                                try {
+                                    FetchSessionHandler handler = sessionHandler(fetchTarget.id());
+                                    if (handler != null) {
+                                        handler.handleError(e);
+                                    }
+                                } finally {
+                                    nodesWithPendingFetchRequests.remove(fetchTarget.id());
                                 }
                             }
                         }
                     });
+
+            this.nodesWithPendingFetchRequests.add(entry.getKey().id());
         }
         return fetchRequestMap.size();
     }
@@ -1055,8 +1071,9 @@ public class Fetcher<K, V> implements Closeable {
                 // If we try to send during the reconnect blackout window, then the request is just
                 // going to be failed anyway before being sent, so skip the send for now
                 log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
-            } else if (client.hasPendingRequests(node)) {
-                log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
+
+            } else if (this.nodesWithPendingFetchRequests.contains(node.id())) {
+                log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
             } else {
                 // if there is a leader and no in-flight requests, issue a new fetch
                 FetchSessionHandler.Builder builder = fetchable.get(node);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1dfdf23..2f40ffc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2941,6 +2941,62 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetcherSessionEpochUpdate() throws Exception {
+        buildFetcher(2);
+
+        MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
+        client.updateMetadata(initialMetadataResponse);
+        assignFromUser(Collections.singleton(tp0));
+        subscriptions.seek(tp0, 0L);
+
+        AtomicInteger fetchesRemaining = new AtomicInteger(1000);
+        executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(() -> {
+            long nextOffset = 0;
+            long nextEpoch = 0;
+            while (fetchesRemaining.get() > 0) {
+                synchronized (consumerClient) {
+                    if (!client.requests().isEmpty()) {
+                        ClientRequest request = client.requests().peek();
+                        FetchRequest fetchRequest = (FetchRequest) request.requestBuilder().build();
+                        int epoch = fetchRequest.metadata().epoch();
+                        assertTrue(String.format("Unexpected epoch expected %d got %d", nextEpoch, epoch), epoch == 0 || epoch == nextEpoch);
+                        nextEpoch++;
+                        LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseMap = new LinkedHashMap<>();
+                        responseMap.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, nextOffset + 2L, nextOffset + 2,
+                                0L, null, buildRecords(nextOffset, 2, nextOffset)));
+                        nextOffset += 2;
+                        client.respondToRequest(request, new FetchResponse<>(Errors.NONE, responseMap, 0, 123));
+                        consumerClient.poll(time.timer(0));
+                    }
+                }
+            }
+            return fetchesRemaining.get();
+        });
+        long nextFetchOffset = 0;
+        while (fetchesRemaining.get() > 0 && !future.isDone()) {
+            if (fetcher.sendFetches() == 1) {
+                synchronized (consumerClient) {
+                    consumerClient.poll(time.timer(0));
+                }
+            }
+            if (fetcher.hasCompletedFetches()) {
+                Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchedRecords();
+                if (!fetchedRecords.isEmpty()) {
+                    fetchesRemaining.decrementAndGet();
+                    List<ConsumerRecord<byte[], byte[]>> records = fetchedRecords.get(tp0);
+                    assertEquals(2, records.size());
+                    assertEquals(nextFetchOffset, records.get(0).offset());
+                    assertEquals(nextFetchOffset + 1, records.get(1).offset());
+                    nextFetchOffset += 2;
+                }
+                assertTrue(fetchedRecords().isEmpty());
+            }
+        }
+        assertEquals(0, future.get());
+    }
+
+    @Test
     public void testEmptyControlBatch() {
         buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);