You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zoucan (Jira)" <ji...@apache.org> on 2022/05/25 08:46:00 UTC

[jira] [Comment Edited] (FLINK-27762) Kafka WakeUpException during handling splits changes

    [ https://issues.apache.org/jira/browse/FLINK-27762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541896#comment-17541896 ] 

zoucan edited comment on FLINK-27762 at 5/25/22 8:45 AM:
---------------------------------------------------------

[~martijnvisser] 

Thanks for your reply.

I have checked the source code of latest version, but there's no difference for this part.

It's difficult to test since this exception doesn't happen every time of partition change.

In my opinion, it happend only if method *org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.wakeUp()* is called while consumer is polling data. 

In this situation, above method will eventually call {*}org.apache.kafka.clients.consumer.KafkaConsumer.wakeup(){*}. 
{code:java}
// org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.wakeUp()

public void wakeUp() {
    wakeup = true;
    if (lastRecords == null) {
        // 1. consumer is polling data, execute org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.wakeUp() 
        splitReader.wakeUp()
    } else {
        elementsQueue.wakeUpPuttingThread(fetcherIndex);
    }
} {code}
 
{code:java}
// org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.wakeUp()

public void wakeUp() {
    // 2. execute org.apache.kafka.clients.consumer.KafkaConsumer.wakeup()
    consumer.wakeup();
} {code}
 

And in method {*}org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(){*}, 

*org.apache.kafka.clients.consumer.KafkaConsumer.position()* will be called.
{code:java}
// org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges()

public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
    // ignore irrelevant code...

    // 3.execute this.removeEmptySplits()
   removeEmptySplits();

    maybeLogSplitChangesHandlingResult(splitsChange);
} {code}
 
{code:java}
// org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits()

private void removeEmptySplits() {
    List<TopicPartition> emptyPartitions = new ArrayList<>();
   
    for (TopicPartition tp : consumer.assignment()) {
        // 4. execute org.apache.kafka.clients.consumer.KafkaConsumer.position()
        // since kafka consumer is waken up before, it will throw WakeUpException.
        if (consumer.position(tp) >= getStoppingOffset(tp)) {
            emptyPartitions.add(tp);
        }
    }

   // ignore irrelevant code...     
} {code}
 

Since our application is already online, we must evaluate the risk for upgrading version.

Anyway, I'll try to test with latest version and share the result.  And If you have any suggestion, please share with me.


was (Author: JIRAUSER289972):
[~martijnvisser] 

Thanks for your reply.

I have checked the source code of latest version, but there's no difference for this part.

It's difficult to test since this exception doesn't happen every time of partition change.

In my opinion, it happend only if method *org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.wakeUp()* is called while consumer is polling data. 

 

In this situation, above method will eventually call {*}org.apache.kafka.clients.consumer.KafkaConsumer.wakeup(){*}. 

 
{code:java}
// org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.wakeUp()

public void wakeUp() {
    wakeup = true;
    if (lastRecords == null) {
        // 1. consumer is polling data, execute org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.wakeUp() 
        splitReader.wakeUp()
    } else {
        elementsQueue.wakeUpPuttingThread(fetcherIndex);
    }
} {code}
 
{code:java}
// org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.wakeUp()

public void wakeUp() {
    2. execute org.apache.kafka.clients.consumer.KafkaConsumer.wakeup()
    consumer.wakeup();
} {code}
 

 

 

And in method {*}org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(){*}, ** 

*org.apache.kafka.clients.consumer.KafkaConsumer.position()* will be called.

 
{code:java}
// org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges()

public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
    // ignore irrelevant code...

    // 3.execute this.removeEmptySplits()
   removeEmptySplits();

    maybeLogSplitChangesHandlingResult(splitsChange);
} {code}
 
{code:java}
// org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits()

private void removeEmptySplits() {
    List<TopicPartition> emptyPartitions = new ArrayList<>();
   
    for (TopicPartition tp : consumer.assignment()) {
        // 4. execute org.apache.kafka.clients.consumer.KafkaConsumer.position()
        // since kafka consumer is waken up before, it will throw WakeUpException.
        if (consumer.position(tp) >= getStoppingOffset(tp)) {
            emptyPartitions.add(tp);
        }
    }

   // ignore irrelevant code...     
} {code}
 

 

 

Since our application is already online, we must evaluate the risk for upgrading version.

Anyway, I'll try to test with latest version and share the result.  And If you have any suggestion, please share with me.

> Kafka WakeUpException during handling splits changes
> ----------------------------------------------------
>
>                 Key: FLINK-27762
>                 URL: https://issues.apache.org/jira/browse/FLINK-27762
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.3
>            Reporter: zoucan
>            Priority: Major
>
>  
> We enable dynamic partition discovery in our flink job, but job failed when kafka partition is changed. 
> Exception detail is shown as follows:
> {code:java}
> java.lang.RuntimeException: One or more fetchers have encountered exception
> 	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> 	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> 	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> 	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
> 	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
> 	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
> 	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	... 1 more
> Caused by: org.apache.kafka.common.errors.WakeupException
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
> 	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
> 	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
> 	at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
> 	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> 	... 6 more {code}
>  
> After preliminary investigation, according to source code of KafkaSource,
> At first: 
> method *org.apache.kafka.clients.consumer.KafkaConsumer.wakeup()* will be called if consumer is polling data.
> Later: 
> method *org.apache.kafka.clients.consumer.KafkaConsumer.position()* will be called during handle splits changes.
> Since consumer has been waken up, it will throw WakeUpException.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)