You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Yu Wang (Jira)" <ji...@apache.org> on 2022/09/29 14:19:00 UTC

[jira] [Created] (KAFKA-14266) MirrorSourceTask will stop mirroring when get the corrupt record

Yu Wang created KAFKA-14266:
-------------------------------

             Summary: MirrorSourceTask will stop mirroring when get the corrupt record
                 Key: KAFKA-14266
                 URL: https://issues.apache.org/jira/browse/KAFKA-14266
             Project: Kafka
          Issue Type: Improvement
          Components: KafkaConnect
    Affects Versions: 3.2.3, 2.5.1
            Reporter: Yu Wang


The mirror task will keeping throwing this error when got a corrupt record

 
{code:java}
[2022-09-28 22:27:07,125] WARN Failure during poll. (org.apache.kafka.connect.mirror.MirrorSourceTask)
org.apache.kafka.common.KafkaException: Received exception when fetching the next record from TOPIC-261. If needed, please seek past the record to continue consumption.
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Record batch for partition TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored crc = 4289549294, computed crc = 3792599753)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
        ... 12 more {code}
 

 

In the poll function of {*}MirrorSourceTask{*}, when the task got *KafkaException* it only print a warn log and return null.

 
{code:java}
@Override
public List<SourceRecord> poll() {
    if (!consumerAccess.tryAcquire()) {
        return null;
    }
    if (stopping) {
        return null;
    }
    try {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
        List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
        ...
        if (sourceRecords.isEmpty()) {
            // WorkerSourceTasks expects non-zero batch size
            return null;
        } else {
            log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions());
            return sourceRecords;
        }
    } catch (WakeupException e) {
        return null;
    } catch (KafkaException e) {
        log.warn("Failure during poll.", e);
        return null;
    } catch (Throwable e)  {
        log.error("Failure during poll.", e);
        // allow Connect to deal with the exception
        throw e;
    } finally {
        consumerAccess.release();
    }
} {code}
As the consumer will keep throwing exception when it receive a corrupt record. This makes the  *MirrorSourceTask* cannot get next records and blocked on the same offset.
{code:java}
private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
    // Error when fetching the next record before deserialization.
    if (corruptLastRecord)
        throw new KafkaException("Received exception when fetching the next record from " + partition
                                     + ". If needed, please seek past the record to "
                                     + "continue consumption.", cachedRecordException);

...

} {code}
As this issue will not have any metrics to alert, after the retention time reaches, the records after the corrupt record in the source topic will lost and cannot be mirrored again.

So it would be better that the mirror source task can throw the exception or expose some metrics for users to alert this kind of issue.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)