You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ayush Chauhan <ay...@zomato.com> on 2021/12/07 09:39:57 UTC

Alternatives of KafkaDeserializationSchema.isEndOfStream()

Hi,

Can you please let me know the alternatives of isEndOfStream() as now
according to docs this method will no longer be used to determine the end
of the stream.

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Dong Lin <li...@gmail.com>.
Hi Arvid,

I spent time reading through the existing KafkaSource related code and
thinking about the best possible solution in the last few days. Now I no
longer think it is a good idea to let user specify this logic in
de-serializer and pass this information via the Collector. I also thought
more about the solution you suggested (e.g. merge the
message-payload-based stopping with the existing offset-based stopping
logic into one class) and preferred not to do this (yet) for the reasons
listed in here
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records#FLIP208:UpdateKafkaSourcetodetectEOFbasedondeserializedrecords-RejectedAlternatives>
.

I have documented another possible solution in FLIP-208 (link
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records#FLIP208:UpdateKafkaSourcetodetectEOFbasedondeserializedrecords-RejectedAlternatives>)
and have opened the discussion thread. Maybe we can continue the discussion
there.

Cheers,
Dong



On Wed, Dec 29, 2021 at 12:38 PM Dong Lin <li...@gmail.com> wrote:

> Hi Arvid,
>
> After discussing with Jiangjie offline, I agree using Collector::close()
> is not appropriate because we in general prefer close() to be called by one
> entity, in this case the Flink runtime. Having close() called by both user
> and Flink runtime could be error-prone, even though we can make it work
> with some extra work.
>
> I am now thinking about adding a public class (e.g. SourceCollector) that
> extends the existing Collector. SourceCollector::endOfStream() could be
> invoked by users to signal EOF. And users could optionally implement a
> KafkaRecordDeserializationSchema::deserialize(SourceCollector) if they
> want to do dynamic EOF.
>
> To make our discussion more efficient and possibly involve more people for
> comments, I will create a FLIP and open a discussion thread for the FLIP.
>
> Thanks,
> Dong
>
>
>
> On Tue, Dec 28, 2021 at 7:40 PM Dong Lin <li...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Thanks a lot for the detailed reply.
>>
>> Just to clarify, I don't plan to ask user to implement
>> KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to
>> not add any public API, but expect users to re-use the
>> existing Collector::close() API inside
>> KafkaRecordDeserializationSchema::deserialize(...). And if a message with
>> the user-specified pattern has arrived, the user can invoke
>> Collector::close() which signals Flink to stop reading from the
>> corresponding source split.
>>
>> Here are a few clarifications in response to the discussion:
>>
>> 1) The boundedness of the source and execution.runtime-mode would not be
>> affected by this proposal. Users can keep using the existing setting
>> without or without the dynamic EOF.
>> 2) The dynamic EOF works independently of the
>> stop stoppingOffsetsInitializer. When reading from Kafka Source, users can
>> rely on just the dynamic EOF without specifying stoppingOffsetsInitializer.
>> And if users specify both dynamic EOF and stoppingOffsetsInitializer, the
>> job stops reading from the source split when either condition is met.
>> 3) Suppose users can specify the dynamic EOF in
>> KafkaRecordDeserializationSchema::deserialize(...), then users have access
>> to the entire ConsumerRecord. This approach could address Ayush's use-case.
>> 4) Suppose we choose to do it in
>> KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF
>> happens inside the RecordEmitter. Yes we will need to be able to close the
>> split.
>> 5) For the majority of users who do not want dynamic EOF, those users can
>> keep using the existing out-of-the-box support for Avro/Json/Protobuf.
>> For advanced users who want dynamic EOF, those users anyway need to encode
>> the dynamic EOF logic in a method similar to
>> KafkaRecordDeserializationSchema (with access to the raw message). Adding
>> the dynamic EOF support would not make their life harder.
>>
>> Based on the discussion so far, it looks like there are two approaches
>> mentioned so far:
>>
>> 1) Let users call Collector::close() API inside
>> KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF.
>>
>> 2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor),
>> where StopCursor subsumes all existing functionalities of
>> the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take
>> both the raw and the deserialized message.
>>
>> It seems that the second approach involves much more API change than the
>> first work (including deprecation of some existing APIs).
>>
>> Regarding the first approach, could you help explain why "close is the
>> completely wrong method for that"? My understanding is the close() method
>> indicates that the caller no longer needs to read from this source split
>> and the associated network resource could be released. Why is it wrong for
>> a user to call this method?
>>
>>
>> On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Dong,
>>>
>>> Could you help explain why we can not dynamically stop reading from a
>>>> source in batch mode?
>>>>
>>> We can but we cannot easily determine if the source is supposed to run
>>> in batch or streaming. A user would need to implement a special
>>> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
>>> the end offset to trigger batch mode.
>>>
>>> How are both concepts supposed to interact? Are we only stopping if any
>>> of the concept state that this is the end?
>>>
>>> We could ofc offer some KafkaSourceBuilde#setBounded() without
>>> parameters so that a user can implement a special
>>> KafkaRecordDeserializationSchema and notify the builder but this looks
>>> awkward to me and is quite error-prone: When a user uses setBounded without
>>> overwriting isEndOfStream, the application would never emit anything.
>>>
>>> My understanding is that when a message with the particular pattern
>>>> (specified by the user) is encountered, we can have the source operator
>>>> emit the high-watermark in such a way as if the particular partition of
>>>> this source has reached EOF. And this must have worked since users have
>>>> been using KafkaDeserializationSchema::isEndOfStream with the
>>>> legacy FlinkKafkaConsumer. Did I miss something here?
>>>>
>>> Yes batch mode is different from bounded streaming. [1] We can only
>>> fully leverage a statically bounded source by statically defining it as
>>> such with the FLIP-27 Source interface. [2]
>>>
>>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>>>> harder for user to implement KafkaRecordDeserializationSchema?
>>>>
>>> Users mostly use the factory methods that adapt to Flink's
>>> DeserializationSchema. We should also offer a builder similarly to
>>> KafkaRecordSerializationSchemaBuilder.
>>>
>>> Regarding "how to use it from Table/SQL", support we allow user to
>>>> encode this dynamic EOF logic inside KafkaRecordDeserializationSchema.
>>>
>>> I'm not sure if we can/should expose dynamic EOF in SQL but at the very
>>> least we should properly support end offsets (as it's now possible). We
>>> must avoid removing the current end offsets in favor of
>>> KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
>>> bounded Kafka sources.
>>>
>>> e.g. call Collector::close() if the message content matches a
>>>> user-specified pattern
>>>>
>>> No, close is the completely wrong method for that. This method should
>>> have never been exposed to the user as it will close the network resources.
>>> However, we need a fully functional network stack for proper shutdown.
>>>
>>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>>>> user could implement the dynamic EOF logic in this method, I am worried
>>>> that this approach would lead to inferior performance due to double message
>>>> deserialization.
>>>>
>>> That is a fair point. In case of Ayush, however, it's the only way to
>>> determine that the pipeline should stop (you pretty much compare if the 5.
>>> byte in the message has changed). If you deserialize into a SpecificRecord,
>>> then the writer schema version is lost for isEndOfStream(T deserialized).
>>>
>>> Another concern I have for
>>> KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
>>> to be called then. If it's in the RecordEmitter, we need to extend the
>>> RecordEmitter to support closing the split. If it's in the SplitReader, we
>>> probably also need double-deserialization because of FLINK-25132 (the
>>> record needs to be deserialized in the RecordEmitter). Maybe you can encode
>>> it in the SplitState but this sounds rather cumbersome if it needs to be
>>> done for all sources.
>>>
>>> The reason is that the user's logic will likely depend on the
>>>> de-serialized message (as opposed to the raw byte in the
>>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>>>> need to deserialize the message inside StopCursor::shouldStop(...) first
>>>> and then the message would be de-serialized again by
>>>> the PulsarDeserializationSchema, which is specified via
>>>> the PulsarSourceBuilder::setDeserializationSchema.
>>>>
>>> As written before, this is not the case of the specific user. Having the
>>> raw message makes it much easier to determine a writer schema change. I'm
>>> sure that there are cases, where you need to look into the data though. To
>>> avoid double-deserialization, a better way may be to pass both the raw and
>>> the deserialized message to `shouldStop` but then we should move the stop
>>> logic to RecordEmitter as written before.
>>>
>>> Do you mean that you prefer to replace
>>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>>>
>>> Ideally, yes. But that needs to be backward compatible as it's a
>>> PublicEvolving interface.
>>>
>>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>>>> use adminClient only without using KafkaClient. On the other hand, it seems
>>>> that there is no performance/correctness concern with the existing
>>>> approach? Is this issue related to the discussion of dynamic EOF?
>>>
>>> I just meant that a user probably only needs access to the adminClient
>>> to retrieve the offsets of a topic and that Kafka's
>>> PartitionOffsetsRetriever nicely hides the client from the user. I'm sure a
>>> user can easily mess up with the admin client in Pulsar (what happens if
>>> this is closed, is the client internally used somewhere else?).
>>>
>>>
>>> TL;DR
>>> In general, I like the separation of concerns:
>>> KafkaRecordDeserializationSchema is for deserializing and StopCondition (or
>>> however we call it) is for stopping. Then a user can reuse pre-defined
>>> KafkaRecordDeserializationSchema and mix in the stopping logic when needed
>>> (remember this is a rare case).
>>>
>>> In most cases, a user will use Avro/Json/Protobuf + schema registry, so
>>> if we provide out-of-the-box support for these formats, the user doesn't
>>> need to touch KafkaRecordDeserializationSchema at all. Then it would be
>>> nice to have a single interface that determines when the source stops (e.g.
>>> StopCondition) with pre-defined implementations (see factory methods in
>>> OffsetsInitializer) for Table/SQL. We could even provide a predefined
>>> strategy for schema changes when the schema registry is used.
>>>
>>> If you already have use-cases that relies on the deserialized data, then
>>> let's move the stopping logic to RecordEmitter. At this point, I'd propose
>>> to pass the raw and deserialized data to the StopCondition.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification
>>>
>>>
>>> On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <li...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> Thanks for the suggestion! Sorry for the late reply. I just finished
>>>> investigating the PulsarSource/StopCursor as you suggested. Please see
>>>> my reply inline.
>>>>
>>>> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Dong,
>>>>>
>>>>> I see your point. The main issue with dynamic EOF is that we can't run
>>>>> in batch mode. That may be desired in the case of Ayush but there may be
>>>>> other use cases where it's not.
>>>>>
>>>>
>>>> Could you help explain why we can not dynamically stop reading from a
>>>> source in batch mode?
>>>>
>>>> My understanding is that when a message with the particular pattern
>>>> (specified by the user) is encountered, we can have the source operator
>>>> emit the high-watermark in such a way as if the particular partition of
>>>> this source has reached EOF. And this must have worked since users have
>>>> been using KafkaDeserializationSchema::isEndOfStream with the
>>>> legacy FlinkKafkaConsumer. Did I miss something here?
>>>>
>>>> Additionally, it's quite a bit of code if you'd implement a
>>>>> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
>>>>> on how to use it from Table/SQL.
>>>>>
>>>>
>>>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>>>> harder for user to implement KafkaRecordDeserializationSchema?
>>>>
>>>> Regarding "how to use it from Table/SQL", support we allow user to
>>>> encode this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g.
>>>> call Collector::close() if the message content matches a user-specified
>>>> pattern), then effect of this change is same as if the partition has
>>>> reached EOF, and Table/SQL can handle this effect as they are doing now
>>>> without any extra change. Does this make sense?
>>>>
>>>>
>>>>>
>>>>> I think we should get inspired on how PulsarSource is solving it. They
>>>>> have an orthogonal interface StopCursor (we could call it StopCondition)
>>>>> [1]. It has some default values (I wonder if we could implement them as
>>>>> enums for easier Table integration).
>>>>>
>>>>
>>>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>>>> user could implement the dynamic EOF logic in this method, I am worried
>>>> that this approach would lead to inferior performance due to double message
>>>> deserialization.
>>>>
>>>> The reason is that the user's logic will likely depend on the
>>>> de-serialized message (as opposed to the raw byte in the
>>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>>>> need to deserialize the message inside StopCursor::shouldStop(...) first
>>>> and then the message would be de-serialized again by
>>>> the PulsarDeserializationSchema, which is specified via
>>>> the PulsarSourceBuilder::setDeserializationSchema.
>>>>
>>>> In comparison, messages can be deserialized only once if we allow users
>>>> to specify the dynamic EOF logic inside
>>>> KafkaRecordDeserializationSchema/PulsarDeserializationSchema.
>>>>
>>>>
>>>>> Ideally, this interface would subsume OffsetsInitializer on stopping
>>>>> side. I think it was not wise to use OffsetsInitializer also for stop
>>>>> offsets as things like OffsetResetStrategy do not make any sense.
>>>>>
>>>>
>>>> Do you mean that you prefer to replace
>>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>>>
>>>> Without digging into detail whether this replacement is feasible, I
>>>> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
>>>> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
>>>> due to the double serialization issue described above), I guess it is
>>>> probably simpler to separate this from the discussion of the dynamic EOF?
>>>>
>>>>
>>>>>
>>>>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid
>>>>> having to hand in the KafkaClient (as we do in Pulsar).
>>>>>
>>>>
>>>> Do you mean that you prefer to remove KafkaClient from
>>>> PartitionOffsetsRetrieverImpl?
>>>>
>>>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>>>> use adminClient only without using KafkaClient. On the other hand, it seems
>>>> that there is no performance/correctness concern with the existing
>>>> approach? Is this issue related to the discussion of dynamic EOF?
>>>>
>>>>
>>>>> I hope I gave some pointers.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>>>>>
>>>>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <li...@gmail.com> wrote:
>>>>>
>>>>>> Yep,  dynamic schema change could be a good solution for the
>>>>>> particular use-case mentioned by Ayush.
>>>>>>
>>>>>> On the other hand, I have heard of valid use-cases where we want to
>>>>>> stop the job based on a control message. For example, let's say we have a
>>>>>> Flink job that keeps processing stock transaction data fetched from Kafka
>>>>>> in real time. Suppose the stock market closes at 4pm, we probably want the
>>>>>> Flink job to stop after it has processed all the transaction data of that
>>>>>> day, instead of running it for the whole day, in order to save CPU cost.
>>>>>>
>>>>>> As of Flink 1.13, users can achieve this goal by sending a special
>>>>>> message to the Kafka topic, and encode logic in the deserializer such that
>>>>>> Flink job stops when this message is observed. IMO, this seems like a
>>>>>> reasonable approach to support the above use-case.
>>>>>>
>>>>>> One possible approach to keep supporting this use-case in Flink 1.15
>>>>>> is to allow user to signal the "end of stream" by calling
>>>>>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> Wouldn't it be better to ask the Iceberg maintainers to support
>>>>>>> dynamic schema change?
>>>>>>>
>>>>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Ayush,
>>>>>>>>
>>>>>>>> Your use-case should be supported.  Sorry, we don't have a good way
>>>>>>>> to support this in Flink 1.14.
>>>>>>>>
>>>>>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dong
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <
>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>
>>>>>>>>> My usecase is that as soon as the avro message version is changed,
>>>>>>>>> I want to reload the job graph so that I can update the downstream iceberg
>>>>>>>>> table.
>>>>>>>>>
>>>>>>>>> Iceberg FlinkSink take table schema during the job start and
>>>>>>>>> cannot be updated during runtime. So, I want to trigger graceful shutdown
>>>>>>>>> and restart the job.
>>>>>>>>>
>>>>>>>>> Can I reload the job graph to achieve that?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ayush,
>>>>>>>>>>
>>>>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by
>>>>>>>>>> Kafka. For new Kafka source, the recommended way is to use the bounded mode
>>>>>>>>>> like this
>>>>>>>>>>
>>>>>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>>>>>> ...
>>>>>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>>>>>                 .build();
>>>>>>>>>>
>>>>>>>>>> You can implement your own OffsetsInitializer or use a provided
>>>>>>>>>> one.
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>>>>>
>>>>>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>>>>>
>>>>>>>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <
>>>>>>>>>>>> metrobert@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Ayush,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you
>>>>>>>>>>>>> send me a link to it?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you please let me know the alternatives
>>>>>>>>>>>>>> of isEndOfStream() as now according to docs this method will no longer be
>>>>>>>>>>>>>> used to determine the end of the stream.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>>>>  Data Platform
>>>>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This email is intended only for the person or the entity to
>>>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, please delete
>>>>>>>>>>>>>> this email and contact the sender.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>>  Data Platform
>>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This email is intended only for the person or the entity to
>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, please delete
>>>>>>>>>>>> this email and contact the sender.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>  Ayush Chauhan
>>>>>>>>>  Data Platform
>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>> email and contact the sender.
>>>>>>>>>
>>>>>>>>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Dong Lin <li...@gmail.com>.
Hi Arvid,

After discussing with Jiangjie offline, I agree using Collector::close() is
not appropriate because we in general prefer close() to be called by one
entity, in this case the Flink runtime. Having close() called by both user
and Flink runtime could be error-prone, even though we can make it work
with some extra work.

I am now thinking about adding a public class (e.g. SourceCollector) that
extends the existing Collector. SourceCollector::endOfStream() could be
invoked by users to signal EOF. And users could optionally implement a
KafkaRecordDeserializationSchema::deserialize(SourceCollector) if they want
to do dynamic EOF.

To make our discussion more efficient and possibly involve more people for
comments, I will create a FLIP and open a discussion thread for the FLIP.

Thanks,
Dong



On Tue, Dec 28, 2021 at 7:40 PM Dong Lin <li...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks a lot for the detailed reply.
>
> Just to clarify, I don't plan to ask user to implement
> KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to
> not add any public API, but expect users to re-use the
> existing Collector::close() API inside
> KafkaRecordDeserializationSchema::deserialize(...). And if a message with
> the user-specified pattern has arrived, the user can invoke
> Collector::close() which signals Flink to stop reading from the
> corresponding source split.
>
> Here are a few clarifications in response to the discussion:
>
> 1) The boundedness of the source and execution.runtime-mode would not be
> affected by this proposal. Users can keep using the existing setting
> without or without the dynamic EOF.
> 2) The dynamic EOF works independently of the
> stop stoppingOffsetsInitializer. When reading from Kafka Source, users can
> rely on just the dynamic EOF without specifying stoppingOffsetsInitializer.
> And if users specify both dynamic EOF and stoppingOffsetsInitializer, the
> job stops reading from the source split when either condition is met.
> 3) Suppose users can specify the dynamic EOF in
> KafkaRecordDeserializationSchema::deserialize(...), then users have access
> to the entire ConsumerRecord. This approach could address Ayush's use-case.
> 4) Suppose we choose to do it in
> KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF
> happens inside the RecordEmitter. Yes we will need to be able to close the
> split.
> 5) For the majority of users who do not want dynamic EOF, those users can
> keep using the existing out-of-the-box support for Avro/Json/Protobuf.
> For advanced users who want dynamic EOF, those users anyway need to encode
> the dynamic EOF logic in a method similar to
> KafkaRecordDeserializationSchema (with access to the raw message). Adding
> the dynamic EOF support would not make their life harder.
>
> Based on the discussion so far, it looks like there are two approaches
> mentioned so far:
>
> 1) Let users call Collector::close() API inside
> KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF.
>
> 2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor), where
> StopCursor subsumes all existing functionalities of
> the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take
> both the raw and the deserialized message.
>
> It seems that the second approach involves much more API change than the
> first work (including deprecation of some existing APIs).
>
> Regarding the first approach, could you help explain why "close is the
> completely wrong method for that"? My understanding is the close() method
> indicates that the caller no longer needs to read from this source split
> and the associated network resource could be released. Why is it wrong for
> a user to call this method?
>
>
> On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Dong,
>>
>> Could you help explain why we can not dynamically stop reading from a
>>> source in batch mode?
>>>
>> We can but we cannot easily determine if the source is supposed to run in
>> batch or streaming. A user would need to implement a special
>> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
>> the end offset to trigger batch mode.
>>
>> How are both concepts supposed to interact? Are we only stopping if any
>> of the concept state that this is the end?
>>
>> We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
>> so that a user can implement a special KafkaRecordDeserializationSchema and
>> notify the builder but this looks awkward to me and is quite error-prone:
>> When a user uses setBounded without overwriting isEndOfStream, the
>> application would never emit anything.
>>
>> My understanding is that when a message with the particular pattern
>>> (specified by the user) is encountered, we can have the source operator
>>> emit the high-watermark in such a way as if the particular partition of
>>> this source has reached EOF. And this must have worked since users have
>>> been using KafkaDeserializationSchema::isEndOfStream with the
>>> legacy FlinkKafkaConsumer. Did I miss something here?
>>>
>> Yes batch mode is different from bounded streaming. [1] We can only fully
>> leverage a statically bounded source by statically defining it as such with
>> the FLIP-27 Source interface. [2]
>>
>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>>> harder for user to implement KafkaRecordDeserializationSchema?
>>>
>> Users mostly use the factory methods that adapt to Flink's
>> DeserializationSchema. We should also offer a builder similarly to
>> KafkaRecordSerializationSchemaBuilder.
>>
>> Regarding "how to use it from Table/SQL", support we allow user to encode
>>> this dynamic EOF logic inside KafkaRecordDeserializationSchema.
>>
>> I'm not sure if we can/should expose dynamic EOF in SQL but at the very
>> least we should properly support end offsets (as it's now possible). We
>> must avoid removing the current end offsets in favor of
>> KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
>> bounded Kafka sources.
>>
>> e.g. call Collector::close() if the message content matches a
>>> user-specified pattern
>>>
>> No, close is the completely wrong method for that. This method should
>> have never been exposed to the user as it will close the network resources.
>> However, we need a fully functional network stack for proper shutdown.
>>
>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>>> user could implement the dynamic EOF logic in this method, I am worried
>>> that this approach would lead to inferior performance due to double message
>>> deserialization.
>>>
>> That is a fair point. In case of Ayush, however, it's the only way to
>> determine that the pipeline should stop (you pretty much compare if the 5.
>> byte in the message has changed). If you deserialize into a SpecificRecord,
>> then the writer schema version is lost for isEndOfStream(T deserialized).
>>
>> Another concern I have for
>> KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
>> to be called then. If it's in the RecordEmitter, we need to extend the
>> RecordEmitter to support closing the split. If it's in the SplitReader, we
>> probably also need double-deserialization because of FLINK-25132 (the
>> record needs to be deserialized in the RecordEmitter). Maybe you can encode
>> it in the SplitState but this sounds rather cumbersome if it needs to be
>> done for all sources.
>>
>> The reason is that the user's logic will likely depend on the
>>> de-serialized message (as opposed to the raw byte in the
>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>>> need to deserialize the message inside StopCursor::shouldStop(...) first
>>> and then the message would be de-serialized again by
>>> the PulsarDeserializationSchema, which is specified via
>>> the PulsarSourceBuilder::setDeserializationSchema.
>>>
>> As written before, this is not the case of the specific user. Having the
>> raw message makes it much easier to determine a writer schema change. I'm
>> sure that there are cases, where you need to look into the data though. To
>> avoid double-deserialization, a better way may be to pass both the raw and
>> the deserialized message to `shouldStop` but then we should move the stop
>> logic to RecordEmitter as written before.
>>
>> Do you mean that you prefer to replace
>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>>
>> Ideally, yes. But that needs to be backward compatible as it's a
>> PublicEvolving interface.
>>
>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>>> use adminClient only without using KafkaClient. On the other hand, it seems
>>> that there is no performance/correctness concern with the existing
>>> approach? Is this issue related to the discussion of dynamic EOF?
>>
>> I just meant that a user probably only needs access to the adminClient to
>> retrieve the offsets of a topic and that Kafka's PartitionOffsetsRetriever
>> nicely hides the client from the user. I'm sure a user can easily mess up
>> with the admin client in Pulsar (what happens if this is closed, is the
>> client internally used somewhere else?).
>>
>>
>> TL;DR
>> In general, I like the separation of concerns:
>> KafkaRecordDeserializationSchema is for deserializing and StopCondition (or
>> however we call it) is for stopping. Then a user can reuse pre-defined
>> KafkaRecordDeserializationSchema and mix in the stopping logic when needed
>> (remember this is a rare case).
>>
>> In most cases, a user will use Avro/Json/Protobuf + schema registry, so
>> if we provide out-of-the-box support for these formats, the user doesn't
>> need to touch KafkaRecordDeserializationSchema at all. Then it would be
>> nice to have a single interface that determines when the source stops (e.g.
>> StopCondition) with pre-defined implementations (see factory methods in
>> OffsetsInitializer) for Table/SQL. We could even provide a predefined
>> strategy for schema changes when the schema registry is used.
>>
>> If you already have use-cases that relies on the deserialized data, then
>> let's move the stopping logic to RecordEmitter. At this point, I'd propose
>> to pass the raw and deserialized data to the StopCondition.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification
>>
>>
>> On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <li...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> Thanks for the suggestion! Sorry for the late reply. I just finished
>>> investigating the PulsarSource/StopCursor as you suggested. Please see
>>> my reply inline.
>>>
>>> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Dong,
>>>>
>>>> I see your point. The main issue with dynamic EOF is that we can't run
>>>> in batch mode. That may be desired in the case of Ayush but there may be
>>>> other use cases where it's not.
>>>>
>>>
>>> Could you help explain why we can not dynamically stop reading from a
>>> source in batch mode?
>>>
>>> My understanding is that when a message with the particular pattern
>>> (specified by the user) is encountered, we can have the source operator
>>> emit the high-watermark in such a way as if the particular partition of
>>> this source has reached EOF. And this must have worked since users have
>>> been using KafkaDeserializationSchema::isEndOfStream with the
>>> legacy FlinkKafkaConsumer. Did I miss something here?
>>>
>>> Additionally, it's quite a bit of code if you'd implement a
>>>> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
>>>> on how to use it from Table/SQL.
>>>>
>>>
>>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>>> harder for user to implement KafkaRecordDeserializationSchema?
>>>
>>> Regarding "how to use it from Table/SQL", support we allow user to
>>> encode this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g.
>>> call Collector::close() if the message content matches a user-specified
>>> pattern), then effect of this change is same as if the partition has
>>> reached EOF, and Table/SQL can handle this effect as they are doing now
>>> without any extra change. Does this make sense?
>>>
>>>
>>>>
>>>> I think we should get inspired on how PulsarSource is solving it. They
>>>> have an orthogonal interface StopCursor (we could call it StopCondition)
>>>> [1]. It has some default values (I wonder if we could implement them as
>>>> enums for easier Table integration).
>>>>
>>>
>>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>>> user could implement the dynamic EOF logic in this method, I am worried
>>> that this approach would lead to inferior performance due to double message
>>> deserialization.
>>>
>>> The reason is that the user's logic will likely depend on the
>>> de-serialized message (as opposed to the raw byte in the
>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>>> need to deserialize the message inside StopCursor::shouldStop(...) first
>>> and then the message would be de-serialized again by
>>> the PulsarDeserializationSchema, which is specified via
>>> the PulsarSourceBuilder::setDeserializationSchema.
>>>
>>> In comparison, messages can be deserialized only once if we allow users
>>> to specify the dynamic EOF logic inside
>>> KafkaRecordDeserializationSchema/PulsarDeserializationSchema.
>>>
>>>
>>>> Ideally, this interface would subsume OffsetsInitializer on stopping
>>>> side. I think it was not wise to use OffsetsInitializer also for stop
>>>> offsets as things like OffsetResetStrategy do not make any sense.
>>>>
>>>
>>> Do you mean that you prefer to replace
>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>>
>>> Without digging into detail whether this replacement is feasible, I
>>> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
>>> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
>>> due to the double serialization issue described above), I guess it is
>>> probably simpler to separate this from the discussion of the dynamic EOF?
>>>
>>>
>>>>
>>>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid
>>>> having to hand in the KafkaClient (as we do in Pulsar).
>>>>
>>>
>>> Do you mean that you prefer to remove KafkaClient from
>>> PartitionOffsetsRetrieverImpl?
>>>
>>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>>> use adminClient only without using KafkaClient. On the other hand, it seems
>>> that there is no performance/correctness concern with the existing
>>> approach? Is this issue related to the discussion of dynamic EOF?
>>>
>>>
>>>> I hope I gave some pointers.
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>>>>
>>>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <li...@gmail.com> wrote:
>>>>
>>>>> Yep,  dynamic schema change could be a good solution for the
>>>>> particular use-case mentioned by Ayush.
>>>>>
>>>>> On the other hand, I have heard of valid use-cases where we want to
>>>>> stop the job based on a control message. For example, let's say we have a
>>>>> Flink job that keeps processing stock transaction data fetched from Kafka
>>>>> in real time. Suppose the stock market closes at 4pm, we probably want the
>>>>> Flink job to stop after it has processed all the transaction data of that
>>>>> day, instead of running it for the whole day, in order to save CPU cost.
>>>>>
>>>>> As of Flink 1.13, users can achieve this goal by sending a special
>>>>> message to the Kafka topic, and encode logic in the deserializer such that
>>>>> Flink job stops when this message is observed. IMO, this seems like a
>>>>> reasonable approach to support the above use-case.
>>>>>
>>>>> One possible approach to keep supporting this use-case in Flink 1.15
>>>>> is to allow user to signal the "end of stream" by calling
>>>>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>>>>
>>>>> What do you think?
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Wouldn't it be better to ask the Iceberg maintainers to support
>>>>>> dynamic schema change?
>>>>>>
>>>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Ayush,
>>>>>>>
>>>>>>> Your use-case should be supported.  Sorry, we don't have a good way
>>>>>>> to support this in Flink 1.14.
>>>>>>>
>>>>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dong
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <
>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>
>>>>>>>> My usecase is that as soon as the avro message version is changed,
>>>>>>>> I want to reload the job graph so that I can update the downstream iceberg
>>>>>>>> table.
>>>>>>>>
>>>>>>>> Iceberg FlinkSink take table schema during the job start and cannot
>>>>>>>> be updated during runtime. So, I want to trigger graceful shutdown and
>>>>>>>> restart the job.
>>>>>>>>
>>>>>>>> Can I reload the job graph to achieve that?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Ayush,
>>>>>>>>>
>>>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by
>>>>>>>>> Kafka. For new Kafka source, the recommended way is to use the bounded mode
>>>>>>>>> like this
>>>>>>>>>
>>>>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>>>>> ...
>>>>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>>>>                 .build();
>>>>>>>>>
>>>>>>>>> You can implement your own OffsetsInitializer or use a provided
>>>>>>>>> one.
>>>>>>>>>
>>>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>>>>
>>>>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>>>>
>>>>>>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <
>>>>>>>>>>> metrobert@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Ayush,
>>>>>>>>>>>>
>>>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you
>>>>>>>>>>>> send me a link to it?
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Can you please let me know the alternatives of isEndOfStream()
>>>>>>>>>>>>> as now according to docs this method will no longer be used to determine
>>>>>>>>>>>>> the end of the stream.
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>>>  Data Platform
>>>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This email is intended only for the person or the entity to
>>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, please delete
>>>>>>>>>>>>> this email and contact the sender.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>  Data Platform
>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>>>> email and contact the sender.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>  Ayush Chauhan
>>>>>>>>  Data Platform
>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>
>>>>>>>>
>>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>>> email and contact the sender.
>>>>>>>>
>>>>>>>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Dong Lin <li...@gmail.com>.
Hi Arvid,

Thanks a lot for the detailed reply.

Just to clarify, I don't plan to ask user to implement
KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to
not add any public API, but expect users to re-use the
existing Collector::close() API inside
KafkaRecordDeserializationSchema::deserialize(...). And if a message with
the user-specified pattern has arrived, the user can invoke
Collector::close() which signals Flink to stop reading from the
corresponding source split.

Here are a few clarifications in response to the discussion:

1) The boundedness of the source and execution.runtime-mode would not be
affected by this proposal. Users can keep using the existing setting
without or without the dynamic EOF.
2) The dynamic EOF works independently of the
stop stoppingOffsetsInitializer. When reading from Kafka Source, users can
rely on just the dynamic EOF without specifying stoppingOffsetsInitializer.
And if users specify both dynamic EOF and stoppingOffsetsInitializer, the
job stops reading from the source split when either condition is met.
3) Suppose users can specify the dynamic EOF in
KafkaRecordDeserializationSchema::deserialize(...), then users have access
to the entire ConsumerRecord. This approach could address Ayush's use-case.
4) Suppose we choose to do it in
KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF
happens inside the RecordEmitter. Yes we will need to be able to close the
split.
5) For the majority of users who do not want dynamic EOF, those users can
keep using the existing out-of-the-box support for Avro/Json/Protobuf. For
advanced users who want dynamic EOF, those users anyway need to encode the
dynamic EOF logic in a method similar to  KafkaRecordDeserializationSchema
(with access to the raw message). Adding the dynamic EOF support would not
make their life harder.

Based on the discussion so far, it looks like there are two approaches
mentioned so far:

1) Let users call Collector::close() API inside
KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF.

2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor), where
StopCursor subsumes all existing functionalities of
the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take
both the raw and the deserialized message.

It seems that the second approach involves much more API change than the
first work (including deprecation of some existing APIs).

Regarding the first approach, could you help explain why "close is the
completely wrong method for that"? My understanding is the close() method
indicates that the caller no longer needs to read from this source split
and the associated network resource could be released. Why is it wrong for
a user to call this method?


On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Dong,
>
> Could you help explain why we can not dynamically stop reading from a
>> source in batch mode?
>>
> We can but we cannot easily determine if the source is supposed to run in
> batch or streaming. A user would need to implement a special
> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
> the end offset to trigger batch mode.
>
> How are both concepts supposed to interact? Are we only stopping if any of
> the concept state that this is the end?
>
> We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
> so that a user can implement a special KafkaRecordDeserializationSchema and
> notify the builder but this looks awkward to me and is quite error-prone:
> When a user uses setBounded without overwriting isEndOfStream, the
> application would never emit anything.
>
> My understanding is that when a message with the particular pattern
>> (specified by the user) is encountered, we can have the source operator
>> emit the high-watermark in such a way as if the particular partition of
>> this source has reached EOF. And this must have worked since users have
>> been using KafkaDeserializationSchema::isEndOfStream with the
>> legacy FlinkKafkaConsumer. Did I miss something here?
>>
> Yes batch mode is different from bounded streaming. [1] We can only fully
> leverage a statically bounded source by statically defining it as such with
> the FLIP-27 Source interface. [2]
>
> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>> harder for user to implement KafkaRecordDeserializationSchema?
>>
> Users mostly use the factory methods that adapt to Flink's
> DeserializationSchema. We should also offer a builder similarly to
> KafkaRecordSerializationSchemaBuilder.
>
> Regarding "how to use it from Table/SQL", support we allow user to encode
>> this dynamic EOF logic inside KafkaRecordDeserializationSchema.
>
> I'm not sure if we can/should expose dynamic EOF in SQL but at the very
> least we should properly support end offsets (as it's now possible). We
> must avoid removing the current end offsets in favor of
> KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
> bounded Kafka sources.
>
> e.g. call Collector::close() if the message content matches a
>> user-specified pattern
>>
> No, close is the completely wrong method for that. This method should have
> never been exposed to the user as it will close the network resources.
> However, we need a fully functional network stack for proper shutdown.
>
> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>> user could implement the dynamic EOF logic in this method, I am worried
>> that this approach would lead to inferior performance due to double message
>> deserialization.
>>
> That is a fair point. In case of Ayush, however, it's the only way to
> determine that the pipeline should stop (you pretty much compare if the 5.
> byte in the message has changed). If you deserialize into a SpecificRecord,
> then the writer schema version is lost for isEndOfStream(T deserialized).
>
> Another concern I have for
> KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
> to be called then. If it's in the RecordEmitter, we need to extend the
> RecordEmitter to support closing the split. If it's in the SplitReader, we
> probably also need double-deserialization because of FLINK-25132 (the
> record needs to be deserialized in the RecordEmitter). Maybe you can encode
> it in the SplitState but this sounds rather cumbersome if it needs to be
> done for all sources.
>
> The reason is that the user's logic will likely depend on the
>> de-serialized message (as opposed to the raw byte in the
>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>> need to deserialize the message inside StopCursor::shouldStop(...) first
>> and then the message would be de-serialized again by
>> the PulsarDeserializationSchema, which is specified via
>> the PulsarSourceBuilder::setDeserializationSchema.
>>
> As written before, this is not the case of the specific user. Having the
> raw message makes it much easier to determine a writer schema change. I'm
> sure that there are cases, where you need to look into the data though. To
> avoid double-deserialization, a better way may be to pass both the raw and
> the deserialized message to `shouldStop` but then we should move the stop
> logic to RecordEmitter as written before.
>
> Do you mean that you prefer to replace
>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>
> Ideally, yes. But that needs to be backward compatible as it's a
> PublicEvolving interface.
>
> I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
>> only without using KafkaClient. On the other hand, it seems that there is
>> no performance/correctness concern with the existing approach? Is this
>> issue related to the discussion of dynamic EOF?
>
> I just meant that a user probably only needs access to the adminClient to
> retrieve the offsets of a topic and that Kafka's PartitionOffsetsRetriever
> nicely hides the client from the user. I'm sure a user can easily mess up
> with the admin client in Pulsar (what happens if this is closed, is the
> client internally used somewhere else?).
>
>
> TL;DR
> In general, I like the separation of concerns:
> KafkaRecordDeserializationSchema is for deserializing and StopCondition (or
> however we call it) is for stopping. Then a user can reuse pre-defined
> KafkaRecordDeserializationSchema and mix in the stopping logic when needed
> (remember this is a rare case).
>
> In most cases, a user will use Avro/Json/Protobuf + schema registry, so if
> we provide out-of-the-box support for these formats, the user doesn't need
> to touch KafkaRecordDeserializationSchema at all. Then it would be nice to
> have a single interface that determines when the source stops (e.g.
> StopCondition) with pre-defined implementations (see factory methods in
> OffsetsInitializer) for Table/SQL. We could even provide a predefined
> strategy for schema changes when the schema registry is used.
>
> If you already have use-cases that relies on the deserialized data, then
> let's move the stopping logic to RecordEmitter. At this point, I'd propose
> to pass the raw and deserialized data to the StopCondition.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification
>
>
> On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <li...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the suggestion! Sorry for the late reply. I just finished
>> investigating the PulsarSource/StopCursor as you suggested. Please see
>> my reply inline.
>>
>> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Dong,
>>>
>>> I see your point. The main issue with dynamic EOF is that we can't run
>>> in batch mode. That may be desired in the case of Ayush but there may be
>>> other use cases where it's not.
>>>
>>
>> Could you help explain why we can not dynamically stop reading from a
>> source in batch mode?
>>
>> My understanding is that when a message with the particular pattern
>> (specified by the user) is encountered, we can have the source operator
>> emit the high-watermark in such a way as if the particular partition of
>> this source has reached EOF. And this must have worked since users have
>> been using KafkaDeserializationSchema::isEndOfStream with the
>> legacy FlinkKafkaConsumer. Did I miss something here?
>>
>> Additionally, it's quite a bit of code if you'd implement a
>>> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
>>> on how to use it from Table/SQL.
>>>
>>
>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>> harder for user to implement KafkaRecordDeserializationSchema?
>>
>> Regarding "how to use it from Table/SQL", support we allow user to encode
>> this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call
>> Collector::close() if the message content matches a user-specified
>> pattern), then effect of this change is same as if the partition has
>> reached EOF, and Table/SQL can handle this effect as they are doing now
>> without any extra change. Does this make sense?
>>
>>
>>>
>>> I think we should get inspired on how PulsarSource is solving it. They
>>> have an orthogonal interface StopCursor (we could call it StopCondition)
>>> [1]. It has some default values (I wonder if we could implement them as
>>> enums for easier Table integration).
>>>
>>
>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>> user could implement the dynamic EOF logic in this method, I am worried
>> that this approach would lead to inferior performance due to double message
>> deserialization.
>>
>> The reason is that the user's logic will likely depend on the
>> de-serialized message (as opposed to the raw byte in the
>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>> need to deserialize the message inside StopCursor::shouldStop(...) first
>> and then the message would be de-serialized again by
>> the PulsarDeserializationSchema, which is specified via
>> the PulsarSourceBuilder::setDeserializationSchema.
>>
>> In comparison, messages can be deserialized only once if we allow users
>> to specify the dynamic EOF logic inside
>> KafkaRecordDeserializationSchema/PulsarDeserializationSchema.
>>
>>
>>> Ideally, this interface would subsume OffsetsInitializer on stopping
>>> side. I think it was not wise to use OffsetsInitializer also for stop
>>> offsets as things like OffsetResetStrategy do not make any sense.
>>>
>>
>> Do you mean that you prefer to replace
>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>
>> Without digging into detail whether this replacement is feasible, I
>> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
>> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
>> due to the double serialization issue described above), I guess it is
>> probably simpler to separate this from the discussion of the dynamic EOF?
>>
>>
>>>
>>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having
>>> to hand in the KafkaClient (as we do in Pulsar).
>>>
>>
>> Do you mean that you prefer to remove KafkaClient from
>> PartitionOffsetsRetrieverImpl?
>>
>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>> use adminClient only without using KafkaClient. On the other hand, it seems
>> that there is no performance/correctness concern with the existing
>> approach? Is this issue related to the discussion of dynamic EOF?
>>
>>
>>> I hope I gave some pointers.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>>>
>>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <li...@gmail.com> wrote:
>>>
>>>> Yep,  dynamic schema change could be a good solution for the particular
>>>> use-case mentioned by Ayush.
>>>>
>>>> On the other hand, I have heard of valid use-cases where we want to
>>>> stop the job based on a control message. For example, let's say we have a
>>>> Flink job that keeps processing stock transaction data fetched from Kafka
>>>> in real time. Suppose the stock market closes at 4pm, we probably want the
>>>> Flink job to stop after it has processed all the transaction data of that
>>>> day, instead of running it for the whole day, in order to save CPU cost.
>>>>
>>>> As of Flink 1.13, users can achieve this goal by sending a special
>>>> message to the Kafka topic, and encode logic in the deserializer such that
>>>> Flink job stops when this message is observed. IMO, this seems like a
>>>> reasonable approach to support the above use-case.
>>>>
>>>> One possible approach to keep supporting this use-case in Flink 1.15 is
>>>> to allow user to signal the "end of stream" by calling
>>>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>>>
>>>> What do you think?
>>>>
>>>>
>>>>
>>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Wouldn't it be better to ask the Iceberg maintainers to support
>>>>> dynamic schema change?
>>>>>
>>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com> wrote:
>>>>>
>>>>>> Hi Ayush,
>>>>>>
>>>>>> Your use-case should be supported.  Sorry, we don't have a good way
>>>>>> to support this in Flink 1.14.
>>>>>>
>>>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>>>
>>>>>> Thanks,
>>>>>> Dong
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <
>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>
>>>>>>> My usecase is that as soon as the avro message version is changed, I
>>>>>>> want to reload the job graph so that I can update the downstream iceberg
>>>>>>> table.
>>>>>>>
>>>>>>> Iceberg FlinkSink take table schema during the job start and cannot
>>>>>>> be updated during runtime. So, I want to trigger graceful shutdown and
>>>>>>> restart the job.
>>>>>>>
>>>>>>> Can I reload the job graph to achieve that?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Ayush,
>>>>>>>>
>>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by
>>>>>>>> Kafka. For new Kafka source, the recommended way is to use the bounded mode
>>>>>>>> like this
>>>>>>>>
>>>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>>>> ...
>>>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>>>                 .build();
>>>>>>>>
>>>>>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>>>>>
>>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>>>
>>>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>>>
>>>>>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <
>>>>>>>>>> metrobert@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Ayush,
>>>>>>>>>>>
>>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you send
>>>>>>>>>>> me a link to it?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Can you please let me know the alternatives of isEndOfStream()
>>>>>>>>>>>> as now according to docs this method will no longer be used to determine
>>>>>>>>>>>> the end of the stream.
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>>  Data Platform
>>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This email is intended only for the person or the entity to
>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, please delete
>>>>>>>>>>>> this email and contact the sender.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>  Data Platform
>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>>> email and contact the sender.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>  Ayush Chauhan
>>>>>>>  Data Platform
>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>
>>>>>>>
>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>> email and contact the sender.
>>>>>>>
>>>>>>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Arvid Heise <ar...@apache.org>.
Hi Dong,

Could you help explain why we can not dynamically stop reading from a
> source in batch mode?
>
We can but we cannot easily determine if the source is supposed to run in
batch or streaming. A user would need to implement a special
KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
the end offset to trigger batch mode.

How are both concepts supposed to interact? Are we only stopping if any of
the concept state that this is the end?

We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
so that a user can implement a special KafkaRecordDeserializationSchema and
notify the builder but this looks awkward to me and is quite error-prone:
When a user uses setBounded without overwriting isEndOfStream, the
application would never emit anything.

My understanding is that when a message with the particular pattern
> (specified by the user) is encountered, we can have the source operator
> emit the high-watermark in such a way as if the particular partition of
> this source has reached EOF. And this must have worked since users have
> been using KafkaDeserializationSchema::isEndOfStream with the
> legacy FlinkKafkaConsumer. Did I miss something here?
>
Yes batch mode is different from bounded streaming. [1] We can only fully
leverage a statically bounded source by statically defining it as such with
the FLIP-27 Source interface. [2]

Hmm.. users already need to provide a KafkaRecordDeserializationSchema
> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
> harder for user to implement KafkaRecordDeserializationSchema?
>
Users mostly use the factory methods that adapt to Flink's
DeserializationSchema. We should also offer a builder similarly to
KafkaRecordSerializationSchemaBuilder.

Regarding "how to use it from Table/SQL", support we allow user to encode
> this dynamic EOF logic inside KafkaRecordDeserializationSchema.

I'm not sure if we can/should expose dynamic EOF in SQL but at the very
least we should properly support end offsets (as it's now possible). We
must avoid removing the current end offsets in favor of
KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
bounded Kafka sources.

e.g. call Collector::close() if the message content matches a
> user-specified pattern
>
No, close is the completely wrong method for that. This method should have
never been exposed to the user as it will close the network resources.
However, we need a fully functional network stack for proper shutdown.

It appears that StopCursor::shouldStop(...) takes a raw Message. While user
> could implement the dynamic EOF logic in this method, I am worried that
> this approach would lead to inferior performance due to double message
> deserialization.
>
That is a fair point. In case of Ayush, however, it's the only way to
determine that the pipeline should stop (you pretty much compare if the 5.
byte in the message has changed). If you deserialize into a SpecificRecord,
then the writer schema version is lost for isEndOfStream(T deserialized).

Another concern I have for
KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
to be called then. If it's in the RecordEmitter, we need to extend the
RecordEmitter to support closing the split. If it's in the SplitReader, we
probably also need double-deserialization because of FLINK-25132 (the
record needs to be deserialized in the RecordEmitter). Maybe you can encode
it in the SplitState but this sounds rather cumbersome if it needs to be
done for all sources.

The reason is that the user's logic will likely depend on the de-serialized
> message (as opposed to the raw byte in the
> org.apache.pulsar.client.api.Message.getData()). In this case, users will
> need to deserialize the message inside StopCursor::shouldStop(...) first
> and then the message would be de-serialized again by
> the PulsarDeserializationSchema, which is specified via
> the PulsarSourceBuilder::setDeserializationSchema.
>
As written before, this is not the case of the specific user. Having the
raw message makes it much easier to determine a writer schema change. I'm
sure that there are cases, where you need to look into the data though. To
avoid double-deserialization, a better way may be to pass both the raw and
the deserialized message to `shouldStop` but then we should move the stop
logic to RecordEmitter as written before.

Do you mean that you prefer to replace
> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>
Ideally, yes. But that needs to be backward compatible as it's a
PublicEvolving interface.

I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
> only without using KafkaClient. On the other hand, it seems that there is
> no performance/correctness concern with the existing approach? Is this
> issue related to the discussion of dynamic EOF?

I just meant that a user probably only needs access to the adminClient to
retrieve the offsets of a topic and that Kafka's PartitionOffsetsRetriever
nicely hides the client from the user. I'm sure a user can easily mess up
with the admin client in Pulsar (what happens if this is closed, is the
client internally used somewhere else?).


TL;DR
In general, I like the separation of concerns:
KafkaRecordDeserializationSchema is for deserializing and StopCondition (or
however we call it) is for stopping. Then a user can reuse pre-defined
KafkaRecordDeserializationSchema and mix in the stopping logic when needed
(remember this is a rare case).

In most cases, a user will use Avro/Json/Protobuf + schema registry, so if
we provide out-of-the-box support for these formats, the user doesn't need
to touch KafkaRecordDeserializationSchema at all. Then it would be nice to
have a single interface that determines when the source stops (e.g.
StopCondition) with pre-defined implementations (see factory methods in
OffsetsInitializer) for Table/SQL. We could even provide a predefined
strategy for schema changes when the schema registry is used.

If you already have use-cases that relies on the deserialized data, then
let's move the stopping logic to RecordEmitter. At this point, I'd propose
to pass the raw and deserialized data to the StopCondition.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification


On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <li...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks for the suggestion! Sorry for the late reply. I just finished
> investigating the PulsarSource/StopCursor as you suggested. Please see my
> reply inline.
>
> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Dong,
>>
>> I see your point. The main issue with dynamic EOF is that we can't run in
>> batch mode. That may be desired in the case of Ayush but there may be other
>> use cases where it's not.
>>
>
> Could you help explain why we can not dynamically stop reading from a
> source in batch mode?
>
> My understanding is that when a message with the particular pattern
> (specified by the user) is encountered, we can have the source operator
> emit the high-watermark in such a way as if the particular partition of
> this source has reached EOF. And this must have worked since users have
> been using KafkaDeserializationSchema::isEndOfStream with the
> legacy FlinkKafkaConsumer. Did I miss something here?
>
> Additionally, it's quite a bit of code if you'd implement a
>> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
>> on how to use it from Table/SQL.
>>
>
> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
> harder for user to implement KafkaRecordDeserializationSchema?
>
> Regarding "how to use it from Table/SQL", support we allow user to encode
> this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call
> Collector::close() if the message content matches a user-specified
> pattern), then effect of this change is same as if the partition has
> reached EOF, and Table/SQL can handle this effect as they are doing now
> without any extra change. Does this make sense?
>
>
>>
>> I think we should get inspired on how PulsarSource is solving it. They
>> have an orthogonal interface StopCursor (we could call it StopCondition)
>> [1]. It has some default values (I wonder if we could implement them as
>> enums for easier Table integration).
>>
>
> It appears that StopCursor::shouldStop(...) takes a raw Message. While
> user could implement the dynamic EOF logic in this method, I am worried
> that this approach would lead to inferior performance due to double message
> deserialization.
>
> The reason is that the user's logic will likely depend on the
> de-serialized message (as opposed to the raw byte in the
> org.apache.pulsar.client.api.Message.getData()). In this case, users will
> need to deserialize the message inside StopCursor::shouldStop(...) first
> and then the message would be de-serialized again by
> the PulsarDeserializationSchema, which is specified via
> the PulsarSourceBuilder::setDeserializationSchema.
>
> In comparison, messages can be deserialized only once if we allow users to
> specify the dynamic EOF logic inside
> KafkaRecordDeserializationSchema/PulsarDeserializationSchema.
>
>
>> Ideally, this interface would subsume OffsetsInitializer on stopping
>> side. I think it was not wise to use OffsetsInitializer also for stop
>> offsets as things like OffsetResetStrategy do not make any sense.
>>
>
> Do you mean that you prefer to replace
> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>
> Without digging into detail whether this replacement is feasible, I
> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
> due to the double serialization issue described above), I guess it is
> probably simpler to separate this from the discussion of the dynamic EOF?
>
>
>>
>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having
>> to hand in the KafkaClient (as we do in Pulsar).
>>
>
> Do you mean that you prefer to remove KafkaClient from
> PartitionOffsetsRetrieverImpl?
>
> I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
> only without using KafkaClient. On the other hand, it seems that there is
> no performance/correctness concern with the existing approach? Is this
> issue related to the discussion of dynamic EOF?
>
>
>> I hope I gave some pointers.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>>
>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <li...@gmail.com> wrote:
>>
>>> Yep,  dynamic schema change could be a good solution for the particular
>>> use-case mentioned by Ayush.
>>>
>>> On the other hand, I have heard of valid use-cases where we want to stop
>>> the job based on a control message. For example, let's say we have a Flink
>>> job that keeps processing stock transaction data fetched from Kafka in real
>>> time. Suppose the stock market closes at 4pm, we probably want the Flink
>>> job to stop after it has processed all the transaction data of that day,
>>> instead of running it for the whole day, in order to save CPU cost.
>>>
>>> As of Flink 1.13, users can achieve this goal by sending a special
>>> message to the Kafka topic, and encode logic in the deserializer such that
>>> Flink job stops when this message is observed. IMO, this seems like a
>>> reasonable approach to support the above use-case.
>>>
>>> One possible approach to keep supporting this use-case in Flink 1.15 is
>>> to allow user to signal the "end of stream" by calling
>>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>>
>>> What do you think?
>>>
>>>
>>>
>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Wouldn't it be better to ask the Iceberg maintainers to support dynamic
>>>> schema change?
>>>>
>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com> wrote:
>>>>
>>>>> Hi Ayush,
>>>>>
>>>>> Your use-case should be supported.  Sorry, we don't have a good way to
>>>>> support this in Flink 1.14.
>>>>>
>>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>>
>>>>> Thanks,
>>>>> Dong
>>>>>
>>>>>
>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ay...@zomato.com>
>>>>> wrote:
>>>>>
>>>>>> My usecase is that as soon as the avro message version is changed, I
>>>>>> want to reload the job graph so that I can update the downstream iceberg
>>>>>> table.
>>>>>>
>>>>>> Iceberg FlinkSink take table schema during the job start and cannot
>>>>>> be updated during runtime. So, I want to trigger graceful shutdown and
>>>>>> restart the job.
>>>>>>
>>>>>> Can I reload the job graph to achieve that?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Ayush,
>>>>>>>
>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by
>>>>>>> Kafka. For new Kafka source, the recommended way is to use the bounded mode
>>>>>>> like this
>>>>>>>
>>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>>> ...
>>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>>                 .build();
>>>>>>>
>>>>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>>>>
>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>>
>>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>>
>>>>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <
>>>>>>>>> metrobert@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ayush,
>>>>>>>>>>
>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you send
>>>>>>>>>> me a link to it?
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Can you please let me know the alternatives of isEndOfStream()
>>>>>>>>>>> as now according to docs this method will no longer be used to determine
>>>>>>>>>>> the end of the stream.
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>  Data Platform
>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>>>> email and contact the sender.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>  Ayush Chauhan
>>>>>>>>>  Data Platform
>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>> email and contact the sender.
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>  Ayush Chauhan
>>>>>>  Data Platform
>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>
>>>>>>
>>>>>> This email is intended only for the person or the entity to whom it
>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>> email and contact the sender.
>>>>>>
>>>>>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Dong Lin <li...@gmail.com>.
Hi Arvid,

Thanks for the suggestion! Sorry for the late reply. I just finished
investigating the PulsarSource/StopCursor as you suggested. Please see my
reply inline.

On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Dong,
>
> I see your point. The main issue with dynamic EOF is that we can't run in
> batch mode. That may be desired in the case of Ayush but there may be other
> use cases where it's not.
>

Could you help explain why we can not dynamically stop reading from a
source in batch mode?

My understanding is that when a message with the particular pattern
(specified by the user) is encountered, we can have the source operator
emit the high-watermark in such a way as if the particular partition of
this source has reached EOF. And this must have worked since users have
been using KafkaDeserializationSchema::isEndOfStream with the
legacy FlinkKafkaConsumer. Did I miss something here?

Additionally, it's quite a bit of code if you'd implement a
> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
> on how to use it from Table/SQL.
>

Hmm.. users already need to provide a KafkaRecordDeserializationSchema
via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
harder for user to implement KafkaRecordDeserializationSchema?

Regarding "how to use it from Table/SQL", support we allow user to encode
this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call
Collector::close() if the message content matches a user-specified
pattern), then effect of this change is same as if the partition has
reached EOF, and Table/SQL can handle this effect as they are doing now
without any extra change. Does this make sense?


>
> I think we should get inspired on how PulsarSource is solving it. They
> have an orthogonal interface StopCursor (we could call it StopCondition)
> [1]. It has some default values (I wonder if we could implement them as
> enums for easier Table integration).
>

It appears that StopCursor::shouldStop(...) takes a raw Message. While user
could implement the dynamic EOF logic in this method, I am worried that
this approach would lead to inferior performance due to double message
deserialization.

The reason is that the user's logic will likely depend on the de-serialized
message (as opposed to the raw byte in the
org.apache.pulsar.client.api.Message.getData()). In this case, users will
need to deserialize the message inside StopCursor::shouldStop(...) first
and then the message would be de-serialized again by
the PulsarDeserializationSchema, which is specified via
the PulsarSourceBuilder::setDeserializationSchema.

In comparison, messages can be deserialized only once if we allow users to
specify the dynamic EOF logic inside
KafkaRecordDeserializationSchema/PulsarDeserializationSchema.


> Ideally, this interface would subsume OffsetsInitializer on stopping side.
> I think it was not wise to use OffsetsInitializer also for stop offsets as
> things like OffsetResetStrategy do not make any sense.
>

Do you mean that you prefer to replace
KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?

Without digging into detail whether this replacement is feasible, I
agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
due to the double serialization issue described above), I guess it is
probably simpler to separate this from the discussion of the dynamic EOF?


>
> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having
> to hand in the KafkaClient (as we do in Pulsar).
>

Do you mean that you prefer to remove KafkaClient from
PartitionOffsetsRetrieverImpl?

I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
only without using KafkaClient. On the other hand, it seems that there is
no performance/correctness concern with the existing approach? Is this
issue related to the discussion of dynamic EOF?


> I hope I gave some pointers.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>
> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <li...@gmail.com> wrote:
>
>> Yep,  dynamic schema change could be a good solution for the particular
>> use-case mentioned by Ayush.
>>
>> On the other hand, I have heard of valid use-cases where we want to stop
>> the job based on a control message. For example, let's say we have a Flink
>> job that keeps processing stock transaction data fetched from Kafka in real
>> time. Suppose the stock market closes at 4pm, we probably want the Flink
>> job to stop after it has processed all the transaction data of that day,
>> instead of running it for the whole day, in order to save CPU cost.
>>
>> As of Flink 1.13, users can achieve this goal by sending a special
>> message to the Kafka topic, and encode logic in the deserializer such that
>> Flink job stops when this message is observed. IMO, this seems like a
>> reasonable approach to support the above use-case.
>>
>> One possible approach to keep supporting this use-case in Flink 1.15 is
>> to allow user to signal the "end of stream" by calling
>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>
>> What do you think?
>>
>>
>>
>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Wouldn't it be better to ask the Iceberg maintainers to support dynamic
>>> schema change?
>>>
>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com> wrote:
>>>
>>>> Hi Ayush,
>>>>
>>>> Your use-case should be supported.  Sorry, we don't have a good way to
>>>> support this in Flink 1.14.
>>>>
>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>
>>>> Thanks,
>>>> Dong
>>>>
>>>>
>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ay...@zomato.com>
>>>> wrote:
>>>>
>>>>> My usecase is that as soon as the avro message version is changed, I
>>>>> want to reload the job graph so that I can update the downstream iceberg
>>>>> table.
>>>>>
>>>>> Iceberg FlinkSink take table schema during the job start and cannot be
>>>>> updated during runtime. So, I want to trigger graceful shutdown and restart
>>>>> the job.
>>>>>
>>>>> Can I reload the job graph to achieve that?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Hi Ayush,
>>>>>>
>>>>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka.
>>>>>> For new Kafka source, the recommended way is to use the bounded mode like
>>>>>> this
>>>>>>
>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>> ...
>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>                 .build();
>>>>>>
>>>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>>>
>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>
>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>
>>>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Ayush,
>>>>>>>>>
>>>>>>>>> I couldn't find the documentation you've mentioned. Can you send
>>>>>>>>> me a link to it?
>>>>>>>>>
>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Can you please let me know the alternatives of isEndOfStream() as
>>>>>>>>>> now according to docs this method will no longer be used to determine the
>>>>>>>>>> end of the stream.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>  Data Platform
>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>>> email and contact the sender.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>  Ayush Chauhan
>>>>>>>>  Data Platform
>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>
>>>>>>>>
>>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>>> email and contact the sender.
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>  Ayush Chauhan
>>>>>  Data Platform
>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>
>>>>>
>>>>> This email is intended only for the person or the entity to whom it is
>>>>> addressed. If you are not the intended recipient, please delete this email
>>>>> and contact the sender.
>>>>>
>>>>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Arvid Heise <ar...@apache.org>.
Hi Dong,

I see your point. The main issue with dynamic EOF is that we can't run in
batch mode. That may be desired in the case of Ayush but there may be other
use cases where it's not.
Additionally, it's quite a bit of code if you'd implement a
KafkaRecordDeserializationSchema from scratch. There is also no obvious way
on how to use it from Table/SQL.

I think we should get inspired on how PulsarSource is solving it. They have
an orthogonal interface StopCursor (we could call it StopCondition) [1]. It
has some default values (I wonder if we could implement them as enums for
easier Table integration).

Ideally, this interface would subsume OffsetsInitializer on stopping side.
I think it was not wise to use OffsetsInitializer also for stop offsets as
things like OffsetResetStrategy do not make any sense.

Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having to
hand in the KafkaClient (as we do in Pulsar).

I hope I gave some pointers.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41

On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <li...@gmail.com> wrote:

> Yep,  dynamic schema change could be a good solution for the particular
> use-case mentioned by Ayush.
>
> On the other hand, I have heard of valid use-cases where we want to stop
> the job based on a control message. For example, let's say we have a Flink
> job that keeps processing stock transaction data fetched from Kafka in real
> time. Suppose the stock market closes at 4pm, we probably want the Flink
> job to stop after it has processed all the transaction data of that day,
> instead of running it for the whole day, in order to save CPU cost.
>
> As of Flink 1.13, users can achieve this goal by sending a special message
> to the Kafka topic, and encode logic in the deserializer such that Flink
> job stops when this message is observed. IMO, this seems like a reasonable
> approach to support the above use-case.
>
> One possible approach to keep supporting this use-case in Flink 1.15 is to
> allow user to signal the "end of stream" by calling Collector::close(...)
> in KafkaRecordDeserializationSchema::deserialize(..).
>
> What do you think?
>
>
>
> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Wouldn't it be better to ask the Iceberg maintainers to support dynamic
>> schema change?
>>
>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com> wrote:
>>
>>> Hi Ayush,
>>>
>>> Your use-case should be supported.  Sorry, we don't have a good way to
>>> support this in Flink 1.14.
>>>
>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>
>>> Thanks,
>>> Dong
>>>
>>>
>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ay...@zomato.com>
>>> wrote:
>>>
>>>> My usecase is that as soon as the avro message version is changed, I
>>>> want to reload the job graph so that I can update the downstream iceberg
>>>> table.
>>>>
>>>> Iceberg FlinkSink take table schema during the job start and cannot be
>>>> updated during runtime. So, I want to trigger graceful shutdown and restart
>>>> the job.
>>>>
>>>> Can I reload the job graph to achieve that?
>>>>
>>>>
>>>>
>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Ayush,
>>>>>
>>>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka.
>>>>> For new Kafka source, the recommended way is to use the bounded mode like
>>>>> this
>>>>>
>>>>> KafkaSource<PartitionAndValue> source =
>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>> ...
>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>                 .build();
>>>>>
>>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>>
>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>
>>>>>> When would you want to end the stream? Could you explain why you need
>>>>>> to end the kafka stream without using the offset?
>>>>>>
>>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Ayush,
>>>>>>>>
>>>>>>>> I couldn't find the documentation you've mentioned. Can you send me
>>>>>>>> a link to it?
>>>>>>>>
>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Can you please let me know the alternatives of isEndOfStream() as
>>>>>>>>> now according to docs this method will no longer be used to determine the
>>>>>>>>> end of the stream.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>  Ayush Chauhan
>>>>>>>>>  Data Platform
>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>> it is addressed. If you are not the intended recipient, please delete this
>>>>>>>>> email and contact the sender.
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>  Ayush Chauhan
>>>>>>>  Data Platform
>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>
>>>>>>>
>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>> email and contact the sender.
>>>>>>>
>>>>>>
>>>>
>>>> --
>>>>  Ayush Chauhan
>>>>  Data Platform
>>>>  [image: mobile-icon]  +91 9990747111
>>>>
>>>>
>>>> This email is intended only for the person or the entity to whom it is
>>>> addressed. If you are not the intended recipient, please delete this email
>>>> and contact the sender.
>>>>
>>>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Dong Lin <li...@gmail.com>.
Yep,  dynamic schema change could be a good solution for the particular
use-case mentioned by Ayush.

On the other hand, I have heard of valid use-cases where we want to stop
the job based on a control message. For example, let's say we have a Flink
job that keeps processing stock transaction data fetched from Kafka in real
time. Suppose the stock market closes at 4pm, we probably want the Flink
job to stop after it has processed all the transaction data of that day,
instead of running it for the whole day, in order to save CPU cost.

As of Flink 1.13, users can achieve this goal by sending a special message
to the Kafka topic, and encode logic in the deserializer such that Flink
job stops when this message is observed. IMO, this seems like a reasonable
approach to support the above use-case.

One possible approach to keep supporting this use-case in Flink 1.15 is to
allow user to signal the "end of stream" by calling Collector::close(...)
in KafkaRecordDeserializationSchema::deserialize(..).

What do you think?



On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:

> Wouldn't it be better to ask the Iceberg maintainers to support dynamic
> schema change?
>
> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com> wrote:
>
>> Hi Ayush,
>>
>> Your use-case should be supported.  Sorry, we don't have a good way to
>> support this in Flink 1.14.
>>
>> I am going to propose a FLIP to fix it in Flink 1.15.
>>
>> Thanks,
>> Dong
>>
>>
>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ay...@zomato.com>
>> wrote:
>>
>>> My usecase is that as soon as the avro message version is changed, I
>>> want to reload the job graph so that I can update the downstream iceberg
>>> table.
>>>
>>> Iceberg FlinkSink take table schema during the job start and cannot be
>>> updated during runtime. So, I want to trigger graceful shutdown and restart
>>> the job.
>>>
>>> Can I reload the job graph to achieve that?
>>>
>>>
>>>
>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Ayush,
>>>>
>>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka.
>>>> For new Kafka source, the recommended way is to use the bounded mode like
>>>> this
>>>>
>>>> KafkaSource<PartitionAndValue> source =
>>>>         KafkaSource.<PartitionAndValue>builder()
>>>> ...
>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>                 .build();
>>>>
>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>
>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com>
>>>> wrote:
>>>>
>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>
>>>>> When would you want to end the stream? Could you explain why you need
>>>>> to end the kafka stream without using the offset?
>>>>>
>>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Ayush,
>>>>>>>
>>>>>>> I couldn't find the documentation you've mentioned. Can you send me
>>>>>>> a link to it?
>>>>>>>
>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Can you please let me know the alternatives of isEndOfStream() as
>>>>>>>> now according to docs this method will no longer be used to determine the
>>>>>>>> end of the stream.
>>>>>>>>
>>>>>>>> --
>>>>>>>>  Ayush Chauhan
>>>>>>>>  Data Platform
>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>
>>>>>>>>
>>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>>> email and contact the sender.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>  Ayush Chauhan
>>>>>>  Data Platform
>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>
>>>>>>
>>>>>> This email is intended only for the person or the entity to whom it
>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>> email and contact the sender.
>>>>>>
>>>>>
>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Arvid Heise <ar...@apache.org>.
Wouldn't it be better to ask the Iceberg maintainers to support dynamic
schema change?

On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <li...@gmail.com> wrote:

> Hi Ayush,
>
> Your use-case should be supported.  Sorry, we don't have a good way to
> support this in Flink 1.14.
>
> I am going to propose a FLIP to fix it in Flink 1.15.
>
> Thanks,
> Dong
>
>
> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ay...@zomato.com>
> wrote:
>
>> My usecase is that as soon as the avro message version is changed, I want
>> to reload the job graph so that I can update the downstream iceberg table.
>>
>> Iceberg FlinkSink take table schema during the job start and cannot be
>> updated during runtime. So, I want to trigger graceful shutdown and restart
>> the job.
>>
>> Can I reload the job graph to achieve that?
>>
>>
>>
>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Ayush,
>>>
>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka.
>>> For new Kafka source, the recommended way is to use the bounded mode like
>>> this
>>>
>>> KafkaSource<PartitionAndValue> source =
>>>         KafkaSource.<PartitionAndValue>builder()
>>> ...
>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>                 .setBounded(OffsetsInitializer.latest())
>>>                 .build();
>>>
>>> You can implement your own OffsetsInitializer or use a provided one.
>>>
>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com> wrote:
>>>
>>>> There is no way to end the kafka stream from the deserializer.
>>>>
>>>> When would you want to end the stream? Could you explain why you need
>>>> to end the kafka stream without using the offset?
>>>>
>>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>
>>>>>
>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ayush,
>>>>>>
>>>>>> I couldn't find the documentation you've mentioned. Can you send me a
>>>>>> link to it?
>>>>>>
>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Can you please let me know the alternatives of isEndOfStream() as
>>>>>>> now according to docs this method will no longer be used to determine the
>>>>>>> end of the stream.
>>>>>>>
>>>>>>> --
>>>>>>>  Ayush Chauhan
>>>>>>>  Data Platform
>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>
>>>>>>>
>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>> email and contact the sender.
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>  Ayush Chauhan
>>>>>  Data Platform
>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>
>>>>>
>>>>> This email is intended only for the person or the entity to whom it is
>>>>> addressed. If you are not the intended recipient, please delete this email
>>>>> and contact the sender.
>>>>>
>>>>
>>
>> --
>>  Ayush Chauhan
>>  Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Dong Lin <li...@gmail.com>.
Hi Ayush,

Your use-case should be supported.  Sorry, we don't have a good way to
support this in Flink 1.14.

I am going to propose a FLIP to fix it in Flink 1.15.

Thanks,
Dong


On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ay...@zomato.com>
wrote:

> My usecase is that as soon as the avro message version is changed, I want
> to reload the job graph so that I can update the downstream iceberg table.
>
> Iceberg FlinkSink take table schema during the job start and cannot be
> updated during runtime. So, I want to trigger graceful shutdown and restart
> the job.
>
> Can I reload the job graph to achieve that?
>
>
>
> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Ayush,
>>
>> DeserializationSchema.isEndOfStream was only ever supported by Kafka. For
>> new Kafka source, the recommended way is to use the bounded mode like this
>>
>> KafkaSource<PartitionAndValue> source =
>>         KafkaSource.<PartitionAndValue>builder()
>> ...
>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>                 .setBounded(OffsetsInitializer.latest())
>>                 .build();
>>
>> You can implement your own OffsetsInitializer or use a provided one.
>>
>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com> wrote:
>>
>>> There is no way to end the kafka stream from the deserializer.
>>>
>>> When would you want to end the stream? Could you explain why you need to
>>> end the kafka stream without using the offset?
>>>
>>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>
>>>>
>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>
>>>>
>>>>
>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ayush,
>>>>>
>>>>> I couldn't find the documentation you've mentioned. Can you send me a
>>>>> link to it?
>>>>>
>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>> ayush.chauhan@zomato.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Can you please let me know the alternatives of isEndOfStream() as now
>>>>>> according to docs this method will no longer be used to determine the end
>>>>>> of the stream.
>>>>>>
>>>>>> --
>>>>>>  Ayush Chauhan
>>>>>>  Data Platform
>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>
>>>>>>
>>>>>> This email is intended only for the person or the entity to whom it
>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>> email and contact the sender.
>>>>>>
>>>>>
>>>>
>>>> --
>>>>  Ayush Chauhan
>>>>  Data Platform
>>>>  [image: mobile-icon]  +91 9990747111
>>>>
>>>>
>>>> This email is intended only for the person or the entity to whom it is
>>>> addressed. If you are not the intended recipient, please delete this email
>>>> and contact the sender.
>>>>
>>>
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Ayush Chauhan <ay...@zomato.com>.
My usecase is that as soon as the avro message version is changed, I want
to reload the job graph so that I can update the downstream iceberg table.

Iceberg FlinkSink take table schema during the job start and cannot be
updated during runtime. So, I want to trigger graceful shutdown and restart
the job.

Can I reload the job graph to achieve that?



On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Ayush,
>
> DeserializationSchema.isEndOfStream was only ever supported by Kafka. For
> new Kafka source, the recommended way is to use the bounded mode like this
>
> KafkaSource<PartitionAndValue> source =
>         KafkaSource.<PartitionAndValue>builder()
> ...
>                 .setStartingOffsets(OffsetsInitializer.earliest())
>                 .setBounded(OffsetsInitializer.latest())
>                 .build();
>
> You can implement your own OffsetsInitializer or use a provided one.
>
> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com> wrote:
>
>> There is no way to end the kafka stream from the deserializer.
>>
>> When would you want to end the stream? Could you explain why you need to
>> end the kafka stream without using the offset?
>>
>> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>>
>>>
>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>
>>>
>>>
>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ayush,
>>>>
>>>> I couldn't find the documentation you've mentioned. Can you send me a
>>>> link to it?
>>>>
>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <ay...@zomato.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Can you please let me know the alternatives of isEndOfStream() as now
>>>>> according to docs this method will no longer be used to determine the end
>>>>> of the stream.
>>>>>
>>>>> --
>>>>>  Ayush Chauhan
>>>>>  Data Platform
>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>
>>>>>
>>>>> This email is intended only for the person or the entity to whom it is
>>>>> addressed. If you are not the intended recipient, please delete this email
>>>>> and contact the sender.
>>>>>
>>>>
>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Arvid Heise <ar...@apache.org>.
Hi Ayush,

DeserializationSchema.isEndOfStream was only ever supported by Kafka. For
new Kafka source, the recommended way is to use the bounded mode like this

KafkaSource<PartitionAndValue> source =
        KafkaSource.<PartitionAndValue>builder()
...
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setBounded(OffsetsInitializer.latest())
                .build();

You can implement your own OffsetsInitializer or use a provided one.

On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ru...@gmail.com> wrote:

> There is no way to end the kafka stream from the deserializer.
>
> When would you want to end the stream? Could you explain why you need to
> end the kafka stream without using the offset?
>
> Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:
>
>>
>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>
>>
>>
>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
>> wrote:
>>
>>> Hi Ayush,
>>>
>>> I couldn't find the documentation you've mentioned. Can you send me a
>>> link to it?
>>>
>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <ay...@zomato.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Can you please let me know the alternatives of isEndOfStream() as now
>>>> according to docs this method will no longer be used to determine the end
>>>> of the stream.
>>>>
>>>> --
>>>>  Ayush Chauhan
>>>>  Data Platform
>>>>  [image: mobile-icon]  +91 9990747111
>>>>
>>>>
>>>> This email is intended only for the person or the entity to whom it is
>>>> addressed. If you are not the intended recipient, please delete this email
>>>> and contact the sender.
>>>>
>>>
>>
>> --
>>  Ayush Chauhan
>>  Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Hang Ruan <ru...@gmail.com>.
There is no way to end the kafka stream from the deserializer.

When would you want to end the stream? Could you explain why you need to
end the kafka stream without using the offset?

Ayush Chauhan <ay...@zomato.com> 于2021年12月8日周三 15:29写道:

>
> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>
>
>
> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com>
> wrote:
>
>> Hi Ayush,
>>
>> I couldn't find the documentation you've mentioned. Can you send me a
>> link to it?
>>
>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <ay...@zomato.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Can you please let me know the alternatives of isEndOfStream() as now
>>> according to docs this method will no longer be used to determine the end
>>> of the stream.
>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Ayush Chauhan <ay...@zomato.com>.
https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69



On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <me...@gmail.com> wrote:

> Hi Ayush,
>
> I couldn't find the documentation you've mentioned. Can you send me a link
> to it?
>
> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <ay...@zomato.com>
> wrote:
>
>> Hi,
>>
>> Can you please let me know the alternatives of isEndOfStream() as now
>> according to docs this method will no longer be used to determine the end
>> of the stream.
>>
>> --
>>  Ayush Chauhan
>>  Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

Posted by Robert Metzger <me...@gmail.com>.
Hi Ayush,

I couldn't find the documentation you've mentioned. Can you send me a link
to it?

On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <ay...@zomato.com>
wrote:

> Hi,
>
> Can you please let me know the alternatives of isEndOfStream() as now
> according to docs this method will no longer be used to determine the end
> of the stream.
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>