You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dong Lin <li...@gmail.com> on 2022/01/04 07:12:54 UTC

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Hi Arvid,

I spent time reading through the existing KafkaSource related code and
thinking about the best possible solution in the last few days. Now I no
longer think it is a good idea to let user specify this logic in
de-serializer and pass this information via the Collector. I also thought
more about the solution you suggested (e.g. merge the
message-payload-based stopping with the existing offset-based stopping
logic into one class) and preferred not to do this (yet) for the reasons
listed in here
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records#FLIP208:UpdateKafkaSourcetodetectEOFbasedondeserializedrecords-RejectedAlternatives>
.

I have documented another possible solution in FLIP-208 (link
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records#FLIP208:UpdateKafkaSourcetodetectEOFbasedondeserializedrecords-RejectedAlternatives>)
and have opened the discussion thread. Maybe we can continue the discussion
there.

Cheers,
Dong



On Wed, Dec 29, 2021 at 12:38 PM Dong Lin <li...@gmail.com> wrote:

> Hi Arvid,
>
> After discussing with Jiangjie offline, I agree using Collector::close()
> is not appropriate because we in general prefer close() to be called by one
> entity, in this case the Flink runtime. Having close() called by both user
> and Flink runtime could be error-prone, even though we can make it work
> with some extra work.
>
> I am now thinking about adding a public class (e.g. SourceCollector) that
> extends the existing Collector. SourceCollector::endOfStream() could be
> invoked by users to signal EOF. And users could optionally implement a
> KafkaRecordDeserializationSchema::deserialize(SourceCollector) if they
> want to do dynamic EOF.
>
> To make our discussion more efficient and possibly involve more people for
> comments, I will create a FLIP and open a discussion thread for the FLIP.
>
> Thanks,
> Dong
>
>
>
> On Tue, Dec 28, 2021 at 7:40 PM Dong Lin <li...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Thanks a lot for the detailed reply.
>>
>> Just to clarify, I don't plan to ask user to implement
>> KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to
>> not add any public API, but expect users to re-use the
>> existing Collector::close() API inside
>> KafkaRecordDeserializationSchema::deserialize(...). And if a message with
>> the user-specified pattern has arrived, the user can invoke
>> Collector::close() which signals Flink to stop reading from the
>> corresponding source split.
>>
>> Here are a few clarifications in response to the discussion:
>>
>> 1) The boundedness of the source and execution.runtime-mode would not be
>> affected by this proposal. Users can keep using the existing setting
>> without or without the dynamic EOF.
>> 2) The dynamic EOF works independently of the
>> stop stoppingOffsetsInitializer. When reading from Kafka Source, users can
>> rely on just the dynamic EOF without specifying stoppingOffsetsInitializer.
>> And if users specify both dynamic EOF and stoppingOffsetsInitializer, the
>> job stops reading from the source split when either condition is met.
>> 3) Suppose users can specify the dynamic EOF in
>> KafkaRecordDeserializationSchema::deserialize(...), then users have access
>> to the entire ConsumerRecord. This approach could address Ayush's use-case.
>> 4) Suppose we choose to do it in
>> KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF
>> happens inside the RecordEmitter. Yes we will need to be able to close the
>> split.
>> 5) For the majority of users who do not want dynamic EOF, those users can
>> keep using the existing out-of-the-box support for Avro/Json/Protobuf.
>> For advanced users who want dynamic EOF, those users anyway need to encode
>> the dynamic EOF logic in a method similar to
>> KafkaRecordDeserializationSchema (with access to the raw message). Adding
>> the dynamic EOF support would not make their life harder.
>>
>> Based on the discussion so far, it looks like there are two approaches
>> mentioned so far:
>>
>> 1) Let users call Collector::close() API inside
>> KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF.
>>
>> 2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor),
>> where StopCursor subsumes all existing functionalities of
>> the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take
>> both the raw and the deserialized message.
>>
>> It seems that the second approach involves much more API change than the
>> first work (including deprecation of some existing APIs).
>>
>> Regarding the first approach, could you help explain why "close is the
>> completely wrong method for that"? My understanding is the close() method
>> indicates that the caller no longer needs to read from this source split
>> and the associated network resource could be released. Why is it wrong for
>> a user to call this method?
>>
>>
>> On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Dong,
>>>
>>> Could you help explain why we can not dynamically stop reading from a
>>>> source in batch mode?
>>>>
>>> We can but we cannot easily determine if the source is supposed to run
>>> in batch or streaming. A user would need to implement a special
>>> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
>>> the end offset to trigger batch mode.
>>>
>>> How are both concepts supposed to interact? Are we only stopping if any
>>> of the concept state that this is the end?
>>>
>>> We could ofc offer some KafkaSourceBuilde#setBounded() without
>>> parameters so that a user can implement a special
>>> KafkaRecordDeserializationSchema and notify the builder but this looks
>>> awkward to me and is quite error-prone: When a user uses setBounded without
>>> overwriting isEndOfStream, the application would never emit anything.
>>>
>>> My understanding is that when a message with the particular pattern
>>>> (specified by the user) is encountered, we can have the source operator
>>>> emit the high-watermark in such a way as if the particular partition of
>>>> this source has reached EOF. And this must have worked since users have
>>>> been using KafkaDeserializationSchema::isEndOfStream with the
>>>> legacy FlinkKafkaConsumer. Did I miss something here?
>>>>
>>> Yes batch mode is different from bounded streaming. [1] We can only
>>> fully leverage a statically bounded source by statically defining it as
>>> such with the FLIP-27 Source interface. [2]
>>>
>>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>>>> harder for user to implement KafkaRecordDeserializationSchema?
>>>>
>>> Users mostly use the factory methods that adapt to Flink's
>>> DeserializationSchema. We should also offer a builder similarly to
>>> KafkaRecordSerializationSchemaBuilder.
>>>
>>> Regarding "how to use it from Table/SQL", support we allow user to
>>>> encode this dynamic EOF logic inside KafkaRecordDeserializationSchema.
>>>
>>> I'm not sure if we can/should expose dynamic EOF in SQL but at the very
>>> least we should properly support end offsets (as it's now possible). We
>>> must avoid removing the current end offsets in favor of
>>> KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
>>> bounded Kafka sources.
>>>
>>> e.g. call Collector::close() if the message content matches a
>>>> user-specified pattern
>>>>
>>> No, close is the completely wrong method for that. This method should
>>> have never been exposed to the user as it will close the network resources.
>>> However, we need a fully functional network stack for proper shutdown.
>>>
>>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>>>> user could implement the dynamic EOF logic in this method, I am worried
>>>> that this approach would lead to inferior performance due to double message
>>>> deserialization.
>>>>
>>> That is a fair point. In case of Ayush, however, it's the only way to
>>> determine that the pipeline should stop (you pretty much compare if the 5.
>>> byte in the message has changed). If you deserialize into a SpecificRecord,
>>> then the writer schema version is lost for isEndOfStream(T deserialized).
>>>
>>> Another concern I have for
>>> KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
>>> to be called then. If it's in the RecordEmitter, we need to extend the
>>> RecordEmitter to support closing the split. If it's in the SplitReader, we
>>> probably also need double-deserialization because of FLINK-25132 (the
>>> record needs to be deserialized in the RecordEmitter). Maybe you can encode
>>> it in the SplitState but this sounds rather cumbersome if it needs to be
>>> done for all sources.
>>>
>>> The reason is that the user's logic will likely depend on the
>>>> de-serialized message (as opposed to the raw byte in the
>>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>>>> need to deserialize the message inside StopCursor::shouldStop(...) first
>>>> and then the message would be de-serialized again by
>>>> the PulsarDeserializationSchema, which is specified via
>>>> the PulsarSourceBuilder::setDeserializationSchema.
>>>>
>>> As written before, this is not the case of the specific user. Having the
>>> raw message makes it much easier to determine a writer schema change. I'm
>>> sure that there are cases, where you need to look into the data though. To
>>> avoid double-deserialization, a better way may be to pass both the raw and
>>> the deserialized message to `shouldStop` but then we should move the stop
>>> logic to RecordEmitter as written before.
>>>
>>> Do you mean that you prefer to replace
>>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>>>
>>> Ideally, yes. But that needs to be backward compatible as it's a
>>> PublicEvolving interface.
>>>
>>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>>>> use adminClient only without using KafkaClient. On the other hand, it seems
>>>> that there is no performance/correctness concern with the existing
>>>> approach? Is this issue related to the discussion of dynamic EOF?
>>>
>>> I just meant that a user probably only needs access to the adminClient
>>> to retrieve the offsets of a topic and that Kafka's
>>> PartitionOffsetsRetriever nicely hides the client from the user. I'm sure a
>>> user can easily mess up with the admin client in Pulsar (what happens if
>>> this is closed, is the client internally used somewhere else?).
>>>
>>>
>>> TL;DR
>>> In general, I like the separation of concerns:
>>> KafkaRecordDeserializationSchema is for deserializing and StopCondition (or
>>> however we call it) is for stopping. Then a user can reuse pre-defined
>>> KafkaRecordDeserializationSchema and mix in the stopping logic when needed
>>> (remember this is a rare case).
>>>
>>> In most cases, a user will use Avro/Json/Protobuf + schema registry, so
>>> if we provide out-of-the-box support for these formats, the user doesn't
>>> need to touch KafkaRecordDeserializationSchema at all. Then it would be
>>> nice to have a single interface that determines when the source stops (e.g.
>>> StopCondition) with pre-defined implementations (see factory methods in
>>> OffsetsInitializer) for Table/SQL. We could even provide a predefined
>>> strategy for schema changes when the schema registry is used.
>>>
>>> If you already have use-cases that relies on the deserialized data, then
>>> let's move the stopping logic to RecordEmitter. At this point, I'd propose
>>> to pass the raw and deserialized data to the StopCondition.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification
>>>
>>>
>>> On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <li...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> Thanks for the suggestion! Sorry for the late reply. I just finished
>>>> investigating the PulsarSource/StopCursor as you suggested. Please see
>>>> my reply inline.
>>>>
>>>> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Dong,
>>>>>
>>>>> I see your point. The main issue with dynamic EOF is that we can't run
>>>>> in batch mode. That may be desired in the case of Ayush but there may be
>>>>> other use cases where it's not.
>>>>>
>>>>
>>>> Could you help explain why we can not dynamically stop reading from a
>>>> source in batch mode?
>>>>
>>>> My understanding is that when a message with the particular pattern
>>>> (specified by the user) is encountered, we can have the source operator
>>>> emit the high-watermark in such a way as if the particular partition of
>>>> this source has reached EOF. And this must have worked since users have
>>>> been using KafkaDeserializationSchema::isEndOfStream with the
>>>> legacy FlinkKafkaConsumer. Did I miss something here?
>>>>
>>>> Additionally, it's quite a bit of code if you'd implement a
>>>>> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
>>>>> on how to use it from Table/SQL.
>>>>>
>>>>
>>>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>>>> harder for user to implement KafkaRecordDeserializationSchema?
>>>>
>>>> Regarding "how to use it from Table/SQL", support we allow user to
>>>> encode this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g.
>>>> call Collector::close() if the message content matches a user-specified
>>>> pattern), then effect of this change is same as if the partition has
>>>> reached EOF, and Table/SQL can handle this effect as they are doing now
>>>> without any extra change. Does this make sense?
>>>>
>>>>
>>>>>
>>>>> I think we should get inspired on how PulsarSource is solving it. They
>>>>> have an orthogonal interface StopCursor (we could call it StopCondition)
>>>>> [1]. It has some default values (I wonder if we could implement them as
>>>>> enums for easier Table integration).
>>>>>
>>>>
>>>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>>>> user could implement the dynamic EOF logic in this method, I am worried
>>>> that this approach would lead to inferior performance due to double message
>>>> deserialization.
>>>>
>>>> The reason is that the user's logic will likely depend on the
>>>> de-serialized message (as opposed to the raw byte in the
>>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>>>> need to deserialize the message inside StopCursor::shouldStop(...) first
>>>> and then the message would be de-serialized again by
>>>> the PulsarDeserializationSchema, which is specified via
>>>> the PulsarSourceBuilder::setDeserializationSchema.
>>>>
>>>> In comparison, messages can be deserialized only once if we allow users
>>>> to specify the dynamic EOF logic inside
>>>> KafkaRecordDeserializationSchema/PulsarDeserializationSchema.
>>>>
>>>>
>>>>> Ideally, this interface would subsume OffsetsInitializer on stopping
>>>>> side. I think it was not wise to use OffsetsInitializer also for stop
>>>>> offsets as things like OffsetResetStrategy do not make any sense.
>>>>>
>>>>
>>>> Do you mean that you prefer to replace
>>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>>>
>>>> Without digging into detail whether this replacement is feasible, I
>>>> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
>>>> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
>>>> due to the double serialization issue described above), I guess it is
>>>> probably simpler to separate this from the discussion of the dynamic EOF?
>>>>
>>>>
>>>>>
>>>>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid
>>>>> having to hand in the KafkaClient (as we do in Pulsar).
>>>>>
>>>>
>>>> Do you mean that you prefer to remove KafkaClient from
>>>> PartitionOffsetsRetrieverImpl?
>>>>
>>>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>>>> use adminClient only without using KafkaClient. On the other hand, it seems
>>>> that there is no performance/correctness concern with the existing
>>>> approach? Is this issue related to the discussion of dynamic EOF?
>>>>
>>>>
>>>>> I hope I gave some pointers.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>>>>>
>>>>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <li...@gmail.com> wrote:
>>>>>
>>>>>> Yep,  dynamic schema change could be a good solution for the
>>>>>> particular use-case mentioned by Ayush.
>>>>>>
>>>>>> On the other hand, I have heard of valid use-cases where we want to
>>>>>> stop the job based on a control message. For example, let's say we have a
>>>>>> Flink job that keeps processing stock transaction data fetched from Kafka
>>>>>> in real time. Suppose the stock market closes at 4pm, we probably want the
>>>>>> Flink job to stop after it has processed all the transaction data of that
>>>>>> day, instead of running it for the whole day, in order to save CPU cost.
>>>>>>
>>>>>> As of Flink 1.13, users can achieve this goal by sending a special
>>>>>> message to the Kafka topic, and encode logic in the deserializer such that
>>>>>> Flink job stops when this message is observed. IMO, this seems like a
>>>>>> reasonable approach to support the above use-case.
>>>>>>
>>>>>> One possible approach to keep supporting this use-case in Flink 1.15
>>>>>> is to allow user to signal the "end of stream" by calling
>>>>>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> Wouldn't it be better to ask the Iceberg maintainers to support
>>>>>>> dynamic schema change?
>>>>>>>
>>>>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Ayush,
>>>>>>>>
>>>>>>>> Your use-case should be supported.  Sorry, we don't have a good way
>>>>>>>> to support this in Flink 1.14.
>>>>>>>>
>>>>>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dong
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <
>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>
>>>>>>>>> My usecase is that as soon as the avro message version is changed,
>>>>>>>>> I want to reload the job graph so that I can update the downstream iceberg
>>>>>>>>> table.
>>>>>>>>>
>>>>>>>>> Iceberg FlinkSink take table schema during the job start and
>>>>>>>>> cannot be updated during runtime. So, I want to trigger graceful shutdown
>>>>>>>>> and restart the job.
>>>>>>>>>
>>>>>>>>> Can I reload the job graph to achieve that?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ayush,
>>>>>>>>>>
>>>>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by
>>>>>>>>>> Kafka. For new Kafka source, the recommended way is to use the bounded mode
>>>>>>>>>> like this
>>>>>>>>>>
>>>>>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>>>>>> ...
>>>>>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>>>>>                 .build();
>>>>>>>>>>
>>>>>>>>>> You can implement your own OffsetsInitializer or use a provided
>>>>>>>>>> one.
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>>>>>
>>>>>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>>>>>
>>>>>>>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <
>>>>>>>>>>>> metrobert@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Ayush,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you
>>>>>>>>>>>>> send me a link to it?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you please let me know the alternatives
>>>>>>>>>>>>>> of isEndOfStream() as now according to docs this method will no longer be
>>>>>>>>>>>>>> used to determine the end of the stream.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>>>>  Data Platform
>>>>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This email is intended only for the person or the entity to
>>>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, please delete
>>>>>>>>>>>>>> this email and contact the sender.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>>  Data Platform
>>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This email is intended only for the person or the entity to
>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, please delete
>>>>>>>>>>>> this email and contact the sender.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>  Ayush Chauhan
>>>>>>>>>  Data Platform
>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>> email and contact the sender.
>>>>>>>>>
>>>>>>>>