You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 邹璨 <zo...@navercorp.com> on 2022/05/25 03:35:02 UTC

Kafka source 检测到分区变更时发生 WakeupException

flink版本: 1.14.3
模块:connectors/kafka 
问题描述:
我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下

2022-05-10 15:08:03
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.common.errors.WakeupException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more



根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。
随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。






此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
如果此电子邮件发送不正确,请立即联系 NAVER Security(dl_naversecurity@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。
​
This email and the information contained in this email are intended solely for the recipient(s) addressed above and may contain information that is confidential and/or privileged or whose disclosure is prohibited by law or other reasons.
If you are not the intended recipient of this email, please be advised that any unauthorized storage, duplication, dissemination, distribution or disclosure of all or part of this email is strictly prohibited.
If you received this email in error, please immediately contact NAVER Security (dl_naversecurity@navercorp.com) and delete this email and any copies and attachments from your system. Thank you for your cooperation.​

Re: Kafka source 检测到分区变更时发生 WakeupException

Posted by Qingsheng Ren <re...@gmail.com>.
Hi,

感谢反馈,看上去是一个 bug。可以在 Apache JIRA [1] 上新建一个 ticket 吗?

[1] https://issues.apache.org/jira

> On May 25, 2022, at 11:35, 邹璨 <zo...@navercorp.com> wrote:
> 
> flink版本: 1.14.3
> 模块:connectors/kafka 
> 问题描述:
> 我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下
> 
> 2022-05-10 15:08:03
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: org.apache.kafka.common.errors.WakeupException
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
> at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
> at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
> at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
> at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> 
> 
> 
> 根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。
> 随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。
> 
> 
> 
> 
> 
> 
> 此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
> 如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
> 如果此电子邮件发送不正确,请立即联系 NAVER Security(dl_naversecurity@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。
> ​
> This email and the information contained in this email are intended solely for the recipient(s) addressed above and may contain information that is confidential and/or privileged or whose disclosure is prohibited by law or other reasons.
> If you are not the intended recipient of this email, please be advised that any unauthorized storage, duplication, dissemination, distribution or disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER Security (dl_naversecurity@navercorp.com) and delete this email and any copies and attachments from your system. Thank you for your cooperation.​