You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Madhav Kelkar (Jira)" <ji...@apache.org> on 2020/03/03 19:51:00 UTC

[jira] [Created] (KAFKA-9641) Kafka Client doesn't handle timeout exceptions for partition fetch requests

Madhav Kelkar created KAFKA-9641:
------------------------------------

             Summary: Kafka Client doesn't handle timeout exceptions for partition fetch requests
                 Key: KAFKA-9641
                 URL: https://issues.apache.org/jira/browse/KAFKA-9641
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 1.1.1, 0.11.0.2, 2.5.0
            Reporter: Madhav Kelkar


Occasionally, we see Kafka client throwing IllegalStateException exception, resulting in our process dying. Here is the exception -

 

 
{code:java}
java.lang.IllegalStateException: Unexpected error code 7 while fetching data at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
{code}
 

Error code 7 is Errors.REQUEST_TIMED_OUT .

I looked at client code, and it looks like it doesn't have Errors.REQUEST_TIMED_OUT handled, so it falls through and throws IllegalState Exception instead.  This is the code for KafkaClient 0.11.0.2 - 

 
{code:java}
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
    TopicPartition tp = completedFetch.partition;
    FetchResponse.PartitionData partition = completedFetch.partitionData;
    long fetchOffset = completedFetch.fetchedOffset;
    PartitionRecords partitionRecords = null;
    Errors error = partition.error;

    try {
        if (!subscriptions.isFetchable(tp)) {
            // this can happen when a rebalance happened or a partition consumption paused
            // while fetch is still in-flight
            log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
        } else if (error == Errors.NONE) {
            // we are interested in this fetch only if the beginning offset matches the
            // current consumed position
            Long position = subscriptions.position(tp);
            if (position == null || position != fetchOffset) {
                log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
                        "the expected offset {}", tp, fetchOffset, position);
                return null;
            }

            log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
                    partition.records.sizeInBytes(), tp, position);
            Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
            partitionRecords = new PartitionRecords(tp, completedFetch, batches);

            if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
                if (completedFetch.responseVersion < 3) {
                    // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
                    Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
                    throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
                            recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize +
                            " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
                            "newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
                            ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
                            recordTooLargePartitions);
                } else {
                    // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
                    throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
                        fetchOffset + ". Received a non-empty fetch response from the server, but no " +
                        "complete records were found.");
                }
            }

            if (partition.highWatermark >= 0) {
                log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
                subscriptions.updateHighWatermark(tp, partition.highWatermark);
            }

            if (partition.lastStableOffset >= 0) {
                log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
                subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
            }
        } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
            log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
            this.metadata.requestUpdate();
        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
                    "may not exist or the user may not have Describe access to it", tp);
            this.metadata.requestUpdate();
        } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
            if (fetchOffset != subscriptions.position(tp)) {
                log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
                        "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
            } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
                subscriptions.needOffsetReset(tp);
            } else {
                throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
            }
        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
            log.warn("Not authorized to read from topic {}.", tp.topic());
            throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
        } else if (error == Errors.UNKNOWN) {
            log.warn("Unknown error fetching data for topic-partition {}", tp);
        } else {
            throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
        }
    } finally {
        if (partitionRecords == null)
            completedFetch.metricAggregator.record(tp, 0, 0);

        if (error != Errors.NONE)
            // we move the partition to the end if there was an error. This way, it's more likely that partitions for
            // the same topic can remain together (allowing for more efficient serialization).
            subscriptions.movePartitionToEnd(tp);
    }

    return partitionRecords;
}
{code}
I looked at other versions and looks like REQUEST_TIMED_OUT hasn't been handled there as well.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)