You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Waleed Fateem (Jira)" <ji...@apache.org> on 2022/12/17 16:48:00 UTC

[jira] [Created] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPartitionsAssigned

Waleed Fateem created KAFKA-14520:
-------------------------------------

             Summary: TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPartitionsAssigned
                 Key: KAFKA-14520
                 URL: https://issues.apache.org/jira/browse/KAFKA-14520
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 3.2.1
            Reporter: Waleed Fateem


I'm on the fence on whether or not this should actually be considered a bug, but decided to open it as such from the perspective of a sink developer. Even though there's a sign of a potential issue on the Kafka broker's side, we're dependent on Kafka Connect to provide a level of robustness so we don't have to manually intervene to restart the connector.

We don't have access to the Kafka broker cluster, so we don't know what the underlying issue might be that caused the following error during a rebalance:
{code:java}
Nov 21, 2022 @ 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition topic-partition-2 could be determined {code}
That leads to the following problem:

 

 
{code:java}
Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPartitionsAssigned for partitions [<list of partitions>] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-the-sink-1] 

{code}
The KafkaConsumer's position() method invoked in the WorkerSinkTask's HandleRebalance code causing that TimeoutException is [here|[https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]:]]:

 
{code:java}
    private class HandleRebalance implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions){
            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
            for (TopicPartition tp : partitions) {                long pos = consumer.position(tp);                lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));                currentOffsets.put(tp, new OffsetAndMetadata(pos));                log.debug("{} Assigned topic partition {} with offset {}", WorkerSinkTask.this, tp, pos);            }{code}
Which is then considered an unrecoverable error [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]:
{code:java}
Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except ion. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code}
Do we expect that TimeoutException to cause the task to be killed, or should have this been handled ideally somehow in the WorkerSinkTask's HandleRebalance code?

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)