You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bill Wicker <Bi...@riskfocus.com> on 2020/02/14 23:20:06 UTC

KafkaFetcher closed before end of stream is received for all partitions.

Hello all,

I'm new to Flink and I have been developing a series of POCs in preparation for a larger project that will utilize Flink. One use case we have is to utilize the same job for both batch and streaming processing using Kafka as the source. When the job is run in batch mode we expect that it will be started, consume all data from the Kafka topic and then shutdown. I have achieved this using the isEndOfStream method in the KafkaDeserializationSchema, however I have one outstanding problem that may be a bug.

The problem arises when the source parallelism is less than the topic's partition count. In this case, there will be a single KafkaFetcher instance per sub-task and each instance may be responsible for multiple partitions. My producer publishes some data and then publishes a final message to each partition with a flag indicating it is the end of stream. When the KafkaFetcher receives an end of stream message it stops running and closes. However, the shutdown gets invoked when the first end of stream message is received even though other partitions that the fetcher is responsible for may not yet have reached the end of stream. In this case, when the job is started again, some of the EOS messages remain on the topic and will immediately be consumed causing some of the sub-tasks to exit before consuming any new data.

Is this the expected behavior, i.e. that each KafkaFetcher will stop running as soon as a single end of stream message is received rather than waiting for all partitions to receive one? If so, is there some other way to achieve my goal of having the application gracefully shutdown once all data has been consumed from the topic?

A few assumptions:

  1.  The producer is a legacy application that produces to Kafka and can't be changed
  2.  Once the producer is done publishing, another application will be invoked to publish the EOS messages and then launch the Flink job
  3.  The Flink job should exit automatically once all data has been consumed
  4.  There should not be any unconsumed EOS messages.

Thank you  in advance!

Bill Wicker | +1 347-369-4580
Software Development Consultant, Risk Focus
New York | London | Riga | Pittsburgh | Toronto
[image001][ConfluentCertifiedDeveloperBadge_email]


RE: KafkaFetcher closed before end of stream is received for all partitions.

Posted by Bill Wicker <Bi...@riskfocus.com>.
Thanks for the update! Since we are still in the planning stage I will try to find another way to achieve what we are trying to do in the meantime and I'll keep an eye on that Jira. Two workarounds I thought about are to either match the parallelism of the source to the partition count, or since this is a batch process that will be triggered, I can pass in a timestamp to start consuming from so that any lingering end of stream message from a previous run can be skipped.

Bill Wicker

-----Original Message-----
From: Tzu-Li Tai <tz...@gmail.com> 
Sent: Monday, February 17, 2020 1:42 AM
To: user@flink.apache.org
Subject: Re: KafkaFetcher closed before end of stream is received for all partitions.

Hi,

Your observations are correct.
It is expected that the result of `KafkaDeserializationSchema#isEndOfStream`
triggers a single subtask to escape its fetch loop. Therefore, if a subtask is assigned multiple partitions, as soon as one record (regardless of which partition it came from) signals end of stream, then the subtask ends.

I'm afraid there is probably no good solution to this given the ill-defined semantics of the `isEndOfStream` method. All reasonable approaches that come to mind require some sort of external trigger to manually shut down the job.

For now, I've filed a JIRA to propose a possible solution to the semantics of the method: https://issues.apache.org/jira/browse/FLINK-16112



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: KafkaFetcher closed before end of stream is received for all partitions.

Posted by Tzu-Li Tai <tz...@gmail.com>.
Hi,

Your observations are correct.
It is expected that the result of `KafkaDeserializationSchema#isEndOfStream`
triggers a single subtask to escape its fetch loop. Therefore, if a subtask
is assigned multiple partitions, as soon as one record (regardless of which
partition it came from) signals end of stream, then the subtask ends.

I'm afraid there is probably no good solution to this given the ill-defined
semantics of the `isEndOfStream` method. All reasonable approaches that come
to mind require some sort of external trigger to manually shut down the job.

For now, I've filed a JIRA to propose a possible solution to the semantics
of the method: https://issues.apache.org/jira/browse/FLINK-16112



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/