You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Madhav Kelkar (Jira)" <ji...@apache.org> on 2020/03/03 19:51:00 UTC
[jira] [Created] (KAFKA-9641) Kafka Client doesn't handle timeout
exceptions for partition fetch requests
Madhav Kelkar created KAFKA-9641:
------------------------------------
Summary: Kafka Client doesn't handle timeout exceptions for partition fetch requests
Key: KAFKA-9641
URL: https://issues.apache.org/jira/browse/KAFKA-9641
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 1.1.1, 0.11.0.2, 2.5.0
Reporter: Madhav Kelkar
Occasionally, we see Kafka client throwing IllegalStateException exception, resulting in our process dying. Here is the exception -
{code:java}
java.lang.IllegalStateException: Unexpected error code 7 while fetching data at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
{code}
Error code 7 is Errors.REQUEST_TIMED_OUT .
I looked at client code, and it looks like it doesn't have Errors.REQUEST_TIMED_OUT handled, so it falls through and throws IllegalState Exception instead. This is the code for KafkaClient 0.11.0.2 -
{code:java}
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
PartitionRecords partitionRecords = null;
Errors error = partition.error;
try {
if (!subscriptions.isFetchable(tp)) {
// this can happen when a rebalance happened or a partition consumption paused
// while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
} else if (error == Errors.NONE) {
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
partition.records.sizeInBytes(), tp, position);
Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
partitionRecords = new PartitionRecords(tp, completedFetch, batches);
if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
if (completedFetch.responseVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize +
" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
recordTooLargePartitions);
} else {
// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
"complete records were found.");
}
}
if (partition.highWatermark >= 0) {
log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
if (partition.lastStableOffset >= 0) {
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
"may not exist or the user may not have Describe access to it", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
if (fetchOffset != subscriptions.position(tp)) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
"does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
subscriptions.needOffsetReset(tp);
} else {
throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
log.warn("Not authorized to read from topic {}.", tp.topic());
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
}
} finally {
if (partitionRecords == null)
completedFetch.metricAggregator.record(tp, 0, 0);
if (error != Errors.NONE)
// we move the partition to the end if there was an error. This way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient serialization).
subscriptions.movePartitionToEnd(tp);
}
return partitionRecords;
}
{code}
I looked at other versions and looks like REQUEST_TIMED_OUT hasn't been handled there as well.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)