You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/16 20:30:34 UTC

[kafka] branch 2.5 updated: KAFKA-9535; Update metadata before retrying partitions when fetching offsets (#8088)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 8535313  KAFKA-9535; Update metadata before retrying partitions when fetching offsets (#8088)
8535313 is described below

commit 853531329331374b101337a7a83709b817db9e15
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sun Feb 16 12:06:33 2020 -0800

    KAFKA-9535; Update metadata before retrying partitions when fetching offsets (#8088)
    
    Today if we attempt to list offsets with a fenced leader epoch, consumer will retry without updating the metadata until the timeout is reached. This affects synchronous APIs such as `offsetsForTimes`, `beginningOffsets`, and `endOffsets`. The fix in this patch is to trigger the metadata update call whenever we see a retriable error before additional attempts.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 125 +++++++++---------
 .../kafka/common/requests/ListOffsetRequest.java   |  14 ++
 .../clients/consumer/internals/FetcherTest.java    | 146 +++++++++++++++------
 3 files changed, 181 insertions(+), 104 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f0aaa13..8e474fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -536,26 +536,21 @@ public class Fetcher<K, V> implements Closeable {
             RequestFuture<ListOffsetResult> future = sendListOffsetsRequests(remainingToSearch, requireTimestamps);
             client.poll(future, timer);
 
-            if (!future.isDone())
+            if (!future.isDone()) {
                 break;
-
-            if (future.succeeded()) {
+            } else if (future.succeeded()) {
                 ListOffsetResult value = future.value();
                 result.fetchedOffsets.putAll(value.fetchedOffsets);
-                if (value.partitionsToRetry.isEmpty())
-                    return result;
-
                 remainingToSearch.keySet().retainAll(value.partitionsToRetry);
             } else if (!future.isRetriable()) {
                 throw future.exception();
-            } else {
-                metadata.requestUpdate();
             }
 
-            if (metadata.updateRequested())
+            if (remainingToSearch.isEmpty()) {
+                return result;
+            } else {
                 client.awaitMetadataUpdate(timer);
-            else
-                timer.sleep(retryBackoffMs);
+            }
         } while (timer.notExpired());
 
         throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms");
@@ -979,63 +974,66 @@ public class Fetcher<K, V> implements Closeable {
             TopicPartition topicPartition = entry.getKey();
             ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
             Errors error = partitionData.error;
-            if (error == Errors.NONE) {
-                if (partitionData.offsets != null) {
-                    // Handle v0 response
-                    long offset;
-                    if (partitionData.offsets.size() > 1) {
-                        future.raise(new IllegalStateException("Unexpected partitionData response of length " +
-                                partitionData.offsets.size()));
-                        return;
-                    } else if (partitionData.offsets.isEmpty()) {
-                        offset = ListOffsetResponse.UNKNOWN_OFFSET;
-                    } else {
-                        offset = partitionData.offsets.get(0);
-                    }
-                    log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
+            switch (error) {
+                case NONE:
+                    if (partitionData.offsets != null) {
+                        // Handle v0 response
+                        long offset;
+                        if (partitionData.offsets.size() > 1) {
+                            future.raise(new IllegalStateException("Unexpected partitionData response of length " +
+                                                                       partitionData.offsets.size()));
+                            return;
+                        } else if (partitionData.offsets.isEmpty()) {
+                            offset = ListOffsetResponse.UNKNOWN_OFFSET;
+                        } else {
+                            offset = partitionData.offsets.get(0);
+                        }
+                        log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
                             topicPartition, offset);
-                    if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
-                        ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
-                        fetchedOffsets.put(topicPartition, offsetData);
-                    }
-                } else {
-                    // Handle v1 and later response
-                    log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
+                        if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
+                            ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
+                            fetchedOffsets.put(topicPartition, offsetData);
+                        }
+                    } else {
+                        // Handle v1 and later response
+                        log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
                             topicPartition, partitionData.offset, partitionData.timestamp);
-                    if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
-                        ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp,
+                        if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
+                            ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp,
                                 partitionData.leaderEpoch);
-                        fetchedOffsets.put(topicPartition, offsetData);
+                            fetchedOffsets.put(topicPartition, offsetData);
+                        }
                     }
-                }
-            } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
-                // The message format on the broker side is before 0.10.0, which means it does not
-                // support timestamps. We treat this case the same as if we weren't able to find an
-                // offset corresponding to the requested timestamp and leave it out of the result.
-                log.debug("Cannot search by timestamp for partition {} because the message format version " +
-                        "is before 0.10.0", topicPartition);
-            } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
-                       error == Errors.REPLICA_NOT_AVAILABLE ||
-                       error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.OFFSET_NOT_AVAILABLE ||
-                       error == Errors.LEADER_NOT_AVAILABLE ||
-                       error == Errors.UNKNOWN_LEADER_EPOCH) {
-                log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
+                    break;
+                case UNSUPPORTED_FOR_MESSAGE_FORMAT:
+                    // The message format on the broker side is before 0.10.0, which means it does not
+                    // support timestamps. We treat this case the same as if we weren't able to find an
+                    // offset corresponding to the requested timestamp and leave it out of the result.
+                    log.debug("Cannot search by timestamp for partition {} because the message format version " +
+                                  "is before 0.10.0", topicPartition);
+                    break;
+                case NOT_LEADER_FOR_PARTITION:
+                case REPLICA_NOT_AVAILABLE:
+                case KAFKA_STORAGE_ERROR:
+                case OFFSET_NOT_AVAILABLE:
+                case LEADER_NOT_AVAILABLE:
+                case FENCED_LEADER_EPOCH:
+                case UNKNOWN_LEADER_EPOCH:
+                    log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
                         topicPartition, error);
-                partitionsToRetry.add(topicPartition);
-            } else if (error == Errors.FENCED_LEADER_EPOCH) {
-                log.debug("Attempt to fetch offsets for partition {} failed due to fenced leader epoch, refresh " +
-                              "the metadata and retrying.", topicPartition);
-                metadata.requestUpdate();
-                partitionsToRetry.add(topicPartition);
-            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
-                partitionsToRetry.add(topicPartition);
-            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                unauthorizedTopics.add(topicPartition.topic());
-            } else {
-                log.warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.", topicPartition, error.message());
-                partitionsToRetry.add(topicPartition);
+                    partitionsToRetry.add(topicPartition);
+                    break;
+                case UNKNOWN_TOPIC_OR_PARTITION:
+                    log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
+                    partitionsToRetry.add(topicPartition);
+                    break;
+                case TOPIC_AUTHORIZATION_FAILED:
+                    unauthorizedTopics.add(topicPartition.topic());
+                    break;
+                default:
+                    log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.",
+                        topicPartition, error.message());
+                    partitionsToRetry.add(topicPartition);
             }
         }
 
@@ -1239,7 +1237,6 @@ public class Fetcher<K, V> implements Closeable {
                     });
                 }
 
-
                 nextCompletedFetch.initialized = true;
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 17f921f..1b6ad12 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
@@ -205,6 +206,19 @@ public class ListOffsetRequest extends AbstractRequest {
         }
 
         @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof PartitionData)) return false;
+            PartitionData other = (PartitionData) obj;
+            return this.timestamp == other.timestamp &&
+                this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(timestamp, currentLeaderEpoch);
+        }
+
+        @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
             bld.append("{timestamp: ").append(timestamp).
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d0eba44..b757e88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -66,7 +66,6 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.EpochEndOffset;
 import org.apache.kafka.common.requests.FetchRequest;
@@ -406,13 +405,10 @@ public class FetcherTest {
     }
 
     private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
-        return new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                FetchRequest fetch = (FetchRequest) body;
-                return fetch.fetchData().containsKey(tp) &&
-                        fetch.fetchData().get(tp).fetchOffset == offset;
-            }
+        return body -> {
+            FetchRequest fetch = (FetchRequest) body;
+            return fetch.fetchData().containsKey(tp) &&
+                    fetch.fetchData().get(tp).fetchOffset == offset;
         };
     }
 
@@ -2371,13 +2367,9 @@ public class FetcherTest {
 
     @Test
     public void testGetOffsetsForTimesTimeout() {
-        try {
-            buildFetcher();
-            fetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L));
-            fail("Should throw timeout exception.");
-        } catch (TimeoutException e) {
-            // let it go.
-        }
+        buildFetcher();
+        assertThrows(TimeoutException.class, () -> fetcher.offsetsForTimes(
+            Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L)));
     }
 
     @Test
@@ -2385,7 +2377,7 @@ public class FetcherTest {
         buildFetcher();
 
         // Empty map
-        assertTrue(fetcher.offsetsForTimes(new HashMap<TopicPartition, Long>(), time.timer(100L)).isEmpty());
+        assertTrue(fetcher.offsetsForTimes(new HashMap<>(), time.timer(100L)).isEmpty());
         // Unknown Offset
         testGetOffsetsForTimesWithUnknownOffset();
         // Error code none with unknown offset
@@ -2422,6 +2414,89 @@ public class FetcherTest {
     }
 
     @Test
+    public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
+        List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_FOR_PARTITION,
+            Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE,
+            Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
+
+        final int newLeaderEpoch = 3;
+        MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith("dummy", 3,
+            singletonMap(topicName, Errors.NONE), singletonMap(topicName, 4), tp -> newLeaderEpoch);
+
+        Node originalLeader = initialUpdateResponse.cluster().leaderFor(tp1);
+        Node newLeader = updatedMetadata.cluster().leaderFor(tp1);
+        assertNotEquals(originalLeader, newLeader);
+
+        for (Errors retriableError : retriableErrors) {
+            buildFetcher();
+
+            subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
+            client.updateMetadata(initialUpdateResponse);
+
+            final long fetchTimestamp = 10L;
+            Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
+            allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
+                Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
+            allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
+                retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
+
+            client.prepareResponseFrom(body -> {
+                boolean isListOffsetRequest = body instanceof ListOffsetRequest;
+                if (isListOffsetRequest) {
+                    ListOffsetRequest request = (ListOffsetRequest) body;
+                    Map<TopicPartition, ListOffsetRequest.PartitionData> expectedTopicPartitions = new HashMap<>();
+                    expectedTopicPartitions.put(tp0, new ListOffsetRequest.PartitionData(
+                        fetchTimestamp, Optional.empty()));
+                    expectedTopicPartitions.put(tp1, new ListOffsetRequest.PartitionData(
+                        fetchTimestamp, Optional.empty()));
+
+                    return request.partitionTimestamps().equals(expectedTopicPartitions);
+                } else {
+                    return false;
+                }
+            }, new ListOffsetResponse(allPartitionData), originalLeader);
+
+            client.prepareMetadataUpdate(updatedMetadata);
+
+            // If the metadata wasn't updated before retrying, the fetcher would consult the original leader and hit a NOT_LEADER exception.
+            // We will count the answered future response in the end to verify if this is the case.
+            Map<TopicPartition, ListOffsetResponse.PartitionData> paritionDataWithFatalError = new HashMap<>(allPartitionData);
+            paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData(
+                Errors.NOT_LEADER_FOR_PARTITION, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
+            client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader);
+
+            // The request to new leader must only contain one partition tp1 with error.
+            client.prepareResponseFrom(body -> {
+                boolean isListOffsetRequest = body instanceof ListOffsetRequest;
+                if (isListOffsetRequest) {
+                    ListOffsetRequest request = (ListOffsetRequest) body;
+
+                    return request.partitionTimestamps().equals(
+                        Collections.singletonMap(tp1, new ListOffsetRequest.PartitionData(
+                            fetchTimestamp, Optional.of(newLeaderEpoch))));
+                } else {
+                    return false;
+                }
+            }, listOffsetResponse(tp1, Errors.NONE, fetchTimestamp, 5L), newLeader);
+
+            Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+                fetcher.offsetsForTimes(
+                    Utils.mkMap(Utils.mkEntry(tp0, fetchTimestamp),
+                    Utils.mkEntry(tp1, fetchTimestamp)), time.timer(Integer.MAX_VALUE));
+
+            assertEquals(Utils.mkMap(
+                Utils.mkEntry(tp0, new OffsetAndTimestamp(4L, fetchTimestamp)),
+                Utils.mkEntry(tp1, new OffsetAndTimestamp(5L, fetchTimestamp))), offsetAndTimestampMap);
+
+            // The NOT_LEADER exception future should not be cleared as we already refreshed the metadata before
+            // first retry, thus never hitting.
+            assertEquals(1, client.numAwaitingResponses());
+
+            fetcher.close();
+        }
+    }
+
+    @Test
     public void testGetOffsetsUnknownLeaderEpoch() {
         buildFetcher();
         subscriptions.assignFromUser(singleton(tp0));
@@ -2616,7 +2691,7 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
 
-        currentOffset += commitTransaction(buffer, 1L, currentOffset);
+        commitTransaction(buffer, 1L, currentOffset);
         buffer.flip();
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -2628,13 +2703,10 @@ public class FetcherTest {
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                FetchRequest request = (FetchRequest) body;
-                assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
-                return true;
-            }
+        client.prepareResponse(body -> {
+            FetchRequest request = (FetchRequest) body;
+            assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+            return true;
         }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
 
         consumerClient.poll(time.timer(0));
@@ -2766,7 +2838,7 @@ public class FetcherTest {
         for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) {
             actuallyCommittedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
         }
-        assertTrue(actuallyCommittedKeys.equals(committedKeys));
+        assertEquals(actuallyCommittedKeys, committedKeys);
     }
 
     @Test
@@ -3304,13 +3376,10 @@ public class FetcherTest {
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                FetchRequest request = (FetchRequest) body;
-                assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
-                return true;
-            }
+        client.prepareResponse(body -> {
+            FetchRequest request = (FetchRequest) body;
+            assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+            return true;
         }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
 
         consumerClient.poll(time.timer(0));
@@ -3344,12 +3413,11 @@ public class FetcherTest {
         return appendTransactionalRecords(buffer, pid, baseOffset, (int) baseOffset, records);
     }
 
-    private int commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
+    private void commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
         short producerEpoch = 0;
         int partitionLeaderEpoch = 0;
         MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
                 new EndTransactionMarker(ControlRecordType.COMMIT, 0));
-        return 1;
     }
 
     private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
@@ -3843,12 +3911,10 @@ public class FetcherTest {
 
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
         // matches any list offset request with the provided timestamp
-        return new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                ListOffsetRequest req = (ListOffsetRequest) body;
-                return timestamp == req.partitionTimestamps().get(tp0).timestamp;
-            }
+        return body -> {
+            ListOffsetRequest req = (ListOffsetRequest) body;
+            return req.partitionTimestamps().equals(Collections.singletonMap(
+                tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.empty())));
         };
     }