You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2021/08/04 18:09:38 UTC

Understanding the semantics of SourceContext.collect

Hi,

I have a question regarding the semantics of event processing from a source
downstream that I want to clarify.

I have a custom source which offloads data from our data warehouse. In my
custom source, I have some state which keeps track of the latest timestamps
that were read. When unloading, I push the data from the warehouse to S3
and then scan the S3 bucket prefix in to know which files to read. These S3
file descriptors are only later read in an AbstractStreamOperator that is
the direct child of the source, and there is a rehash boundary between the
two.

The ASO receives these S3 file descriptors one by one, reads them and does
some additional parsing and sends them downstream to various operators.

My question is: is there a guarantee, that once an element is being pushed
downstream from the Source function to the ASO using SourceContext.collect,
it will only return once the element has been processed throughout the
entire execution graph?

The reason I'm asking this is because I'm doing some refactoring work on
the ASO and I'm wondering whether these file prefixes that are received via
the processElement function in the ASO should be stored in a state
somewhere up until I finish their processing. If SourceContext.collect does
provide the guarantee that once pushed, it will only return when the
element has passed through the entire DAG (even though some of the data
needs to go over the wire) then I don't need any additional storage for
that state and can rely on that fact that it has synchronously been pushed
end to end.

-- 
Best Regards,
Yuval Itzchakov.

Re: Understanding the semantics of SourceContext.collect

Posted by Caizhi Weng <ts...@gmail.com>.
Oh, and in batch jobs even if the whole DAG is a single node this is not
guaranteed. For example, for a sort operator the record will be stored in
memory or on disk and only after all records have arrived will these
records be sorted and sent to the downstream. So the state in your ASO is
still needed.

Caizhi Weng <ts...@gmail.com> 于2021年8月5日周四 上午10:19写道:

> Hi!
>
> There is no such guarantee unless the whole DAG is a single node. Flink's
> runtime runs the same node (task) in the same thread, while different nodes
> (tasks) are executed in different threads, even in different machines.
>
> Yuval Itzchakov <yu...@gmail.com> 于2021年8月5日周四 上午2:10写道:
>
>> Hi,
>>
>> I have a question regarding the semantics of event processing from a
>> source downstream that I want to clarify.
>>
>> I have a custom source which offloads data from our data warehouse. In my
>> custom source, I have some state which keeps track of the latest timestamps
>> that were read. When unloading, I push the data from the warehouse to S3
>> and then scan the S3 bucket prefix in to know which files to read. These S3
>> file descriptors are only later read in an AbstractStreamOperator that is
>> the direct child of the source, and there is a rehash boundary between the
>> two.
>>
>> The ASO receives these S3 file descriptors one by one, reads them and
>> does some additional parsing and sends them downstream to various operators.
>>
>> My question is: is there a guarantee, that once an element is being
>> pushed downstream from the Source function to the ASO using
>> SourceContext.collect, it will only return once the element has been
>> processed throughout the entire execution graph?
>>
>> The reason I'm asking this is because I'm doing some refactoring work on
>> the ASO and I'm wondering whether these file prefixes that are received via
>> the processElement function in the ASO should be stored in a state
>> somewhere up until I finish their processing. If SourceContext.collect does
>> provide the guarantee that once pushed, it will only return when the
>> element has passed through the entire DAG (even though some of the data
>> needs to go over the wire) then I don't need any additional storage for
>> that state and can rely on that fact that it has synchronously been pushed
>> end to end.
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

Re: Understanding the semantics of SourceContext.collect

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

There is no such guarantee unless the whole DAG is a single node. Flink's
runtime runs the same node (task) in the same thread, while different nodes
(tasks) are executed in different threads, even in different machines.

Yuval Itzchakov <yu...@gmail.com> 于2021年8月5日周四 上午2:10写道:

> Hi,
>
> I have a question regarding the semantics of event processing from a
> source downstream that I want to clarify.
>
> I have a custom source which offloads data from our data warehouse. In my
> custom source, I have some state which keeps track of the latest timestamps
> that were read. When unloading, I push the data from the warehouse to S3
> and then scan the S3 bucket prefix in to know which files to read. These S3
> file descriptors are only later read in an AbstractStreamOperator that is
> the direct child of the source, and there is a rehash boundary between the
> two.
>
> The ASO receives these S3 file descriptors one by one, reads them and does
> some additional parsing and sends them downstream to various operators.
>
> My question is: is there a guarantee, that once an element is being pushed
> downstream from the Source function to the ASO using SourceContext.collect,
> it will only return once the element has been processed throughout the
> entire execution graph?
>
> The reason I'm asking this is because I'm doing some refactoring work on
> the ASO and I'm wondering whether these file prefixes that are received via
> the processElement function in the ASO should be stored in a state
> somewhere up until I finish their processing. If SourceContext.collect does
> provide the guarantee that once pushed, it will only return when the
> element has passed through the entire DAG (even though some of the data
> needs to go over the wire) then I don't need any additional storage for
> that state and can rely on that fact that it has synchronously been pushed
> end to end.
>
> --
> Best Regards,
> Yuval Itzchakov.
>