You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/02/16 22:35:38 UTC
[kafka] branch 2.4 updated: KAFKA-9535;
Update metadata before retrying partitions when fetching offsets
(#8088)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 667ec2d KAFKA-9535; Update metadata before retrying partitions when fetching offsets (#8088)
667ec2d is described below
commit 667ec2ddf9be741ab48cc011039e70e985099ed9
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sun Feb 16 12:06:33 2020 -0800
KAFKA-9535; Update metadata before retrying partitions when fetching offsets (#8088)
Today if we attempt to list offsets with a fenced leader epoch, consumer will retry without updating the metadata until the timeout is reached. This affects synchronous APIs such as `offsetsForTimes`, `beginningOffsets`, and `endOffsets`. The fix in this patch is to trigger the metadata update call whenever we see a retriable error before additional attempts.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/consumer/internals/Fetcher.java | 128 +++++++++---------
.../kafka/common/requests/ListOffsetRequest.java | 14 ++
.../clients/consumer/internals/FetcherTest.java | 147 +++++++++++++++------
3 files changed, 186 insertions(+), 103 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 384da31..a8cb4eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -70,6 +70,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.LogContext;
@@ -536,24 +537,21 @@ public class Fetcher<K, V> implements Closeable {
RequestFuture<ListOffsetResult> future = sendListOffsetsRequests(remainingToSearch, requireTimestamps);
client.poll(future, timer);
- if (!future.isDone())
+ if (!future.isDone()) {
break;
-
- if (future.succeeded()) {
+ } else if (future.succeeded()) {
ListOffsetResult value = future.value();
result.fetchedOffsets.putAll(value.fetchedOffsets);
- if (value.partitionsToRetry.isEmpty())
- return result;
-
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
} else if (!future.isRetriable()) {
throw future.exception();
}
- if (metadata.updateRequested())
+ if (remainingToSearch.isEmpty()) {
+ return result;
+ } else {
client.awaitMetadataUpdate(timer);
- else
- timer.sleep(retryBackoffMs);
+ }
} while (timer.notExpired());
throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms");
@@ -921,8 +919,9 @@ public class Fetcher<K, V> implements Closeable {
currentInfo.get().partitionInfo().leader(), tp);
partitionsToRetry.add(tp);
} else {
- partitionDataMap.put(tp,
- new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch())));
+ int currentEpoch = currentInfo.get().epoch();
+ Optional<Integer> currentEpochOpt = currentEpoch < 0 ? Optional.empty() : Optional.of(currentEpoch);
+ partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, currentEpochOpt));
}
}
return regroupPartitionMapByNode(partitionDataMap);
@@ -978,62 +977,66 @@ public class Fetcher<K, V> implements Closeable {
TopicPartition topicPartition = entry.getKey();
ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
Errors error = partitionData.error;
- if (error == Errors.NONE) {
- if (partitionData.offsets != null) {
- // Handle v0 response
- long offset;
- if (partitionData.offsets.size() > 1) {
- future.raise(new IllegalStateException("Unexpected partitionData response of length " +
- partitionData.offsets.size()));
- return;
- } else if (partitionData.offsets.isEmpty()) {
- offset = ListOffsetResponse.UNKNOWN_OFFSET;
- } else {
- offset = partitionData.offsets.get(0);
- }
- log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
+ switch (error) {
+ case NONE:
+ if (partitionData.offsets != null) {
+ // Handle v0 response
+ long offset;
+ if (partitionData.offsets.size() > 1) {
+ future.raise(new IllegalStateException("Unexpected partitionData response of length " +
+ partitionData.offsets.size()));
+ return;
+ } else if (partitionData.offsets.isEmpty()) {
+ offset = ListOffsetResponse.UNKNOWN_OFFSET;
+ } else {
+ offset = partitionData.offsets.get(0);
+ }
+ log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
topicPartition, offset);
- if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
- ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
- fetchedOffsets.put(topicPartition, offsetData);
- }
- } else {
- // Handle v1 and later response
- log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
+ if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
+ ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
+ fetchedOffsets.put(topicPartition, offsetData);
+ }
+ } else {
+ // Handle v1 and later response
+ log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partitionData.offset, partitionData.timestamp);
- if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
- ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp,
+ if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
+ ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp,
partitionData.leaderEpoch);
- fetchedOffsets.put(topicPartition, offsetData);
+ fetchedOffsets.put(topicPartition, offsetData);
+ }
}
- }
- } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
- // The message format on the broker side is before 0.10.0, which means it does not
- // support timestamps. We treat this case the same as if we weren't able to find an
- // offset corresponding to the requested timestamp and leave it out of the result.
- log.debug("Cannot search by timestamp for partition {} because the message format version " +
- "is before 0.10.0", topicPartition);
- } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
- error == Errors.REPLICA_NOT_AVAILABLE ||
- error == Errors.KAFKA_STORAGE_ERROR ||
- error == Errors.OFFSET_NOT_AVAILABLE ||
- error == Errors.LEADER_NOT_AVAILABLE) {
- log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
+ break;
+ case UNSUPPORTED_FOR_MESSAGE_FORMAT:
+ // The message format on the broker side is before 0.10.0, which means it does not
+ // support timestamps. We treat this case the same as if we weren't able to find an
+ // offset corresponding to the requested timestamp and leave it out of the result.
+ log.debug("Cannot search by timestamp for partition {} because the message format version " +
+ "is before 0.10.0", topicPartition);
+ break;
+ case NOT_LEADER_FOR_PARTITION:
+ case REPLICA_NOT_AVAILABLE:
+ case KAFKA_STORAGE_ERROR:
+ case OFFSET_NOT_AVAILABLE:
+ case LEADER_NOT_AVAILABLE:
+ case FENCED_LEADER_EPOCH:
+ case UNKNOWN_LEADER_EPOCH:
+ log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
topicPartition, error);
- partitionsToRetry.add(topicPartition);
- } else if (error == Errors.FENCED_LEADER_EPOCH ||
- error == Errors.UNKNOWN_LEADER_EPOCH) {
- log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
- topicPartition, error);
- partitionsToRetry.add(topicPartition);
- } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
- log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
- partitionsToRetry.add(topicPartition);
- } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
- unauthorizedTopics.add(topicPartition.topic());
- } else {
- log.warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.", topicPartition, error.message());
- partitionsToRetry.add(topicPartition);
+ partitionsToRetry.add(topicPartition);
+ break;
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
+ partitionsToRetry.add(topicPartition);
+ break;
+ case TOPIC_AUTHORIZATION_FAILED:
+ unauthorizedTopics.add(topicPartition.topic());
+ break;
+ default:
+ log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.",
+ topicPartition, error.message());
+ partitionsToRetry.add(topicPartition);
}
}
@@ -1233,7 +1236,6 @@ public class Fetcher<K, V> implements Closeable {
});
}
-
nextCompletedFetch.initialized = true;
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 47c47d2..01418c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -204,6 +205,19 @@ public class ListOffsetRequest extends AbstractRequest {
}
@Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PartitionData)) return false;
+ PartitionData other = (PartitionData) obj;
+ return this.timestamp == other.timestamp &&
+ this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp, currentLeaderEpoch);
+ }
+
+ @Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("{timestamp: ").append(timestamp).
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1aa7814..436c40b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -65,7 +65,6 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
@@ -406,13 +405,10 @@ public class FetcherTest {
}
private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
- return new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- FetchRequest fetch = (FetchRequest) body;
- return fetch.fetchData().containsKey(tp) &&
- fetch.fetchData().get(tp).fetchOffset == offset;
- }
+ return body -> {
+ FetchRequest fetch = (FetchRequest) body;
+ return fetch.fetchData().containsKey(tp) &&
+ fetch.fetchData().get(tp).fetchOffset == offset;
};
}
@@ -2371,13 +2367,9 @@ public class FetcherTest {
@Test
public void testGetOffsetsForTimesTimeout() {
- try {
- buildFetcher();
- fetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L));
- fail("Should throw timeout exception.");
- } catch (TimeoutException e) {
- // let it go.
- }
+ buildFetcher();
+ assertThrows(TimeoutException.class, () -> fetcher.offsetsForTimes(
+ Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L)));
}
@Test
@@ -2385,7 +2377,7 @@ public class FetcherTest {
buildFetcher();
// Empty map
- assertTrue(fetcher.offsetsForTimes(new HashMap<TopicPartition, Long>(), time.timer(100L)).isEmpty());
+ assertTrue(fetcher.offsetsForTimes(new HashMap<>(), time.timer(100L)).isEmpty());
// Unknown Offset
testGetOffsetsForTimesWithUnknownOffset();
// Error code none with unknown offset
@@ -2422,6 +2414,89 @@ public class FetcherTest {
}
@Test
+ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
+ List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_FOR_PARTITION,
+ Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE,
+ Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
+
+ final int newLeaderEpoch = 3;
+ MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith("dummy", 3,
+ singletonMap(topicName, Errors.NONE), singletonMap(topicName, 4), tp -> newLeaderEpoch);
+
+ Node originalLeader = initialUpdateResponse.cluster().leaderFor(tp1);
+ Node newLeader = updatedMetadata.cluster().leaderFor(tp1);
+ assertNotEquals(originalLeader, newLeader);
+
+ for (Errors retriableError : retriableErrors) {
+ buildFetcher();
+
+ subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
+ client.updateMetadata(initialUpdateResponse);
+
+ final long fetchTimestamp = 10L;
+ Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
+ allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
+ Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
+ allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
+ retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
+
+ client.prepareResponseFrom(body -> {
+ boolean isListOffsetRequest = body instanceof ListOffsetRequest;
+ if (isListOffsetRequest) {
+ ListOffsetRequest request = (ListOffsetRequest) body;
+ Map<TopicPartition, ListOffsetRequest.PartitionData> expectedTopicPartitions = new HashMap<>();
+ expectedTopicPartitions.put(tp0, new ListOffsetRequest.PartitionData(
+ fetchTimestamp, Optional.empty()));
+ expectedTopicPartitions.put(tp1, new ListOffsetRequest.PartitionData(
+ fetchTimestamp, Optional.empty()));
+
+ return request.partitionTimestamps().equals(expectedTopicPartitions);
+ } else {
+ return false;
+ }
+ }, new ListOffsetResponse(allPartitionData), originalLeader);
+
+ client.prepareMetadataUpdate(updatedMetadata);
+
+ // If the metadata wasn't updated before retrying, the fetcher would consult the original leader and hit a NOT_LEADER exception.
+ // We will count the answered future response in the end to verify if this is the case.
+ Map<TopicPartition, ListOffsetResponse.PartitionData> paritionDataWithFatalError = new HashMap<>(allPartitionData);
+ paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData(
+ Errors.NOT_LEADER_FOR_PARTITION, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
+ client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader);
+
+ // The request to new leader must only contain one partition tp1 with error.
+ client.prepareResponseFrom(body -> {
+ boolean isListOffsetRequest = body instanceof ListOffsetRequest;
+ if (isListOffsetRequest) {
+ ListOffsetRequest request = (ListOffsetRequest) body;
+
+ return request.partitionTimestamps().equals(
+ Collections.singletonMap(tp1, new ListOffsetRequest.PartitionData(
+ fetchTimestamp, Optional.of(newLeaderEpoch))));
+ } else {
+ return false;
+ }
+ }, listOffsetResponse(tp1, Errors.NONE, fetchTimestamp, 5L), newLeader);
+
+ Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+ fetcher.offsetsForTimes(
+ Utils.mkMap(Utils.mkEntry(tp0, fetchTimestamp),
+ Utils.mkEntry(tp1, fetchTimestamp)), time.timer(Integer.MAX_VALUE));
+
+ assertEquals(Utils.mkMap(
+ Utils.mkEntry(tp0, new OffsetAndTimestamp(4L, fetchTimestamp)),
+ Utils.mkEntry(tp1, new OffsetAndTimestamp(5L, fetchTimestamp))), offsetAndTimestampMap);
+
+ // The NOT_LEADER exception future should not be cleared as we already refreshed the metadata before
+ // first retry, thus never hitting.
+ assertEquals(1, client.numAwaitingResponses());
+
+ fetcher.close();
+ }
+ }
+
+ @Test
public void testGetOffsetsUnknownLeaderEpoch() {
buildFetcher();
subscriptions.assignFromUser(singleton(tp0));
@@ -2475,6 +2550,7 @@ public class FetcherTest {
public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
buildFetcher();
+ subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
final String anotherTopic = "another-topic";
final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0);
@@ -2583,7 +2659,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
- currentOffset += commitTransaction(buffer, 1L, currentOffset);
+ commitTransaction(buffer, 1L, currentOffset);
buffer.flip();
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -2595,13 +2671,10 @@ public class FetcherTest {
// normal fetch
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- FetchRequest request = (FetchRequest) body;
- assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
- return true;
- }
+ client.prepareResponse(body -> {
+ FetchRequest request = (FetchRequest) body;
+ assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+ return true;
}, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
consumerClient.poll(time.timer(0));
@@ -2733,7 +2806,7 @@ public class FetcherTest {
for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) {
actuallyCommittedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
}
- assertTrue(actuallyCommittedKeys.equals(committedKeys));
+ assertEquals(actuallyCommittedKeys, committedKeys);
}
@Test
@@ -3271,13 +3344,10 @@ public class FetcherTest {
// normal fetch
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- FetchRequest request = (FetchRequest) body;
- assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
- return true;
- }
+ client.prepareResponse(body -> {
+ FetchRequest request = (FetchRequest) body;
+ assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+ return true;
}, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
consumerClient.poll(time.timer(0));
@@ -3311,12 +3381,11 @@ public class FetcherTest {
return appendTransactionalRecords(buffer, pid, baseOffset, (int) baseOffset, records);
}
- private int commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
+ private void commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
short producerEpoch = 0;
int partitionLeaderEpoch = 0;
MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
new EndTransactionMarker(ControlRecordType.COMMIT, 0));
- return 1;
}
private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
@@ -3804,12 +3873,10 @@ public class FetcherTest {
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
// matches any list offset request with the provided timestamp
- return new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- ListOffsetRequest req = (ListOffsetRequest) body;
- return timestamp == req.partitionTimestamps().get(tp0).timestamp;
- }
+ return body -> {
+ ListOffsetRequest req = (ListOffsetRequest) body;
+ return req.partitionTimestamps().equals(Collections.singletonMap(
+ tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.empty())));
};
}