You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Yu Wang (Jira)" <ji...@apache.org> on 2022/09/29 14:21:00 UTC

[jira] [Updated] (KAFKA-14266) MirrorSourceTask will stop mirroring when get corrupt record

     [ https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yu Wang updated KAFKA-14266:
----------------------------
    Summary: MirrorSourceTask will stop mirroring when get corrupt record  (was: MirrorSourceTask will stop mirroring when get the corrupt record)

> MirrorSourceTask will stop mirroring when get corrupt record
> ------------------------------------------------------------
>
>                 Key: KAFKA-14266
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14266
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.5.1, 3.2.3
>            Reporter: Yu Wang
>            Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll. (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the next record from TOPIC-261. If needed, please seek past the record to continue consumption.
>         at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
>         at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
>         at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
>         at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored crc = 4289549294, computed crc = 3792599753)
>         at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
>         at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
>         at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
>         at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
>         ... 12 more {code}
>  
> In the poll function of {*}MirrorSourceTask{*}, when the task got {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List<SourceRecord> poll() {
>     if (!consumerAccess.tryAcquire()) {
>         return null;
>     }
>     if (stopping) {
>         return null;
>     }
>     try {
>         ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
>         List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
>         ...
>         if (sourceRecords.isEmpty()) {
>             // WorkerSourceTasks expects non-zero batch size
>             return null;
>         } else {
>             log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions());
>             return sourceRecords;
>         }
>     } catch (WakeupException e) {
>         return null;
>     } catch (KafkaException e) {
>         log.warn("Failure during poll.", e);
>         return null;
>     } catch (Throwable e)  {
>         log.error("Failure during poll.", e);
>         // allow Connect to deal with the exception
>         throw e;
>     } finally {
>         consumerAccess.release();
>     }
> } {code}
> In the next poll, the consumer will keep throwing exception because it has received a corrupt record. This makes the  *MirrorSourceTask* cannot get next records and be blocked on the same offset.
> {code:java}
> private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
>     // Error when fetching the next record before deserialization.
>     if (corruptLastRecord)
>         throw new KafkaException("Received exception when fetching the next record from " + partition
>                                      + ". If needed, please seek past the record to "
>                                      + "continue consumption.", cachedRecordException);
> ...
> } {code}
> As this issue will not have any metrics to alert, after the retention time reaches, the records after the corrupt record in the source topic will lost and cannot be mirrored again.
> So it would be better that the mirror source task can throw the exception or expose some metrics for users to alert this kind of issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)