You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Addison Higham <ad...@gmail.com> on 2018/08/24 16:46:33 UTC

Using a ProcessFunction as a "Source"

HI,

I am writing a parallel source function that ideally needs to receive some
messages as control information (specifically, a state message on where to
start reading from a kinesis stream). As far as I can tell, there isn't a
way to make a sourceFunction receive input (which makes sense) so I am
thinking it makes sense to use a processFunction that will occasionally
receive control messages and mostly just output a lot of messages.

This works from an API perspective, with a few different options, I could
either:

A) have the processElement function block on calling the loop that will
produce messages or
B) have the processEelement function return (by pass the collector and
starting the reading on a different thread), but continue to produce
messages downstream

This obviously does raise some concerns though:

1. Does this break any assumptions flink has of message lifecycle? Option A
of blocking on processElement for very long periods seems straight forward
but less than ideal, not to mention not being able to handle any other
control messages.

However, I am not sure if a processFunction sending messages after the
processElement function has returned would break some expectations flink
has of operator lifeycles. Messages are also emitted by timers, etc, but
this would be completely outside any of those function calls as it is
started on another thread. This is obviously how most SourceFunctions work,
but it isn't clear if the same technique holds for ProcessFunctions

2. Would this have a negative impact on backpressure downstream? Since I am
still going to be using the same collector instance, it seems like it
should ultimately work, but I wonder if there are other details I am not
aware of.

3. Is this just a terrible idea in general? It seems like I could maybe do
this by implementing directly on top of an Operator, but I am not as
familiar with that API

Thanks in advance for any thoughts!

Addison

Re: Using a ProcessFunction as a "Source"

Posted by Aljoscha Krettek <al...@apache.org>.
As an update, there is now also this FLIP for the source refactoring: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

> On 1. Nov 2018, at 20:47, Addison Higham <ad...@gmail.com> wrote:
> 
> This is fairly stale, but getting back to this:
> 
> We ended up going the route of using the Operator API and implementing something similar to the  `readFile` API with one real source function that reads out splits and then a small abstraction over the AbstractStreamOperator, a `MessagableSourceFunction`, that had a similar API to processFunction.
> 
> It is a little bit more to deal with, but wasn't too bad all told.
> 
> I am hoping to get the code cleaned up and post it at least as an example of how to use the Operator API for some more advanced use cases.
> 
> Aljoscha: That looks really interesting! I actually saw that too late to consider something like that, but seems like a good change!
> 
> Thanks for the input.
> 
> Addison
> On Thu, Aug 30, 2018 at 4:07 AM Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi Addison,
> 
> for a while now different ideas about reworking the Source interface have been floated. I implemented a prototype that showcases my favoured approach for such a new interface: https://github.com/aljoscha/flink/commits/refactor-source-interface <https://github.com/aljoscha/flink/commits/refactor-source-interface>
> 
> This basically splits the Source into two parts: a SplitEnumerator and a SplitReader. The enumerator is responsible for discovering what should be read and the reader is responsible for reading splits. In this model, the SplitReader does not necessarily have to sit at the beginning of the pipeline, it could sit somewhere in the middle and the splits don't have to necessarily come from the enumerator but could come from a different source.
> 
> I think this could fit the use case that you're describing.
> 
> Best,
> Aljoscha
> 
>> On 25. Aug 2018, at 11:45, Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
>> 
>> The SourceFunction interface is rather flexible so you can do pretty much whatever you want. Exact implementation depends on whether control messages are pulled or pushed to the source; in the first case you'd simply block within "run()" on the external call, in the latter you'd have it block on a queue of some sort that is fed by another thread waiting for messages.
>> 
>> AFAIK you should never use the collector outside of "processElement".
>> 
>> On 25.08.2018 05:15, vino yang wrote:
>>> Hi Addison,
>>> 
>>> I have a lot of things I don't understand. Is your source self-generated message? Why can't source receive input? If the source is unacceptable then why is it called source? Isn't kafka-connector the input as source?
>>> 
>>> If you mean that under normal circumstances it can't receive another input about control messages, there are some ways to solve it.
>>> 
>>> 1) Access external systems in your source to get or subscribe to control messages, such as Zookeeper.
>>> 2) If your source is followed by a map or process operator, then they can be chained together as a "big" source, then you can pass your control message via Flink's new feature "broadcast state". See this blog post for details.[1]
>>> 3) Mix control messages with normal messages in the same message flow. After the control message is parsed, the corresponding action is taken. Of course, this kind of program is not very recommended.
>>> 
>>> [1]: https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink <https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink>
>>> 
>>> Thanks, vino.
>>> 
>>> Addison Higham <addisonj@gmail.com <ma...@gmail.com>> 于2018年8月25日周六 上午12:46写道:
>>> HI,
>>> 
>>> I am writing a parallel source function that ideally needs to receive some messages as control information (specifically, a state message on where to start reading from a kinesis stream). As far as I can tell, there isn't a way to make a sourceFunction receive input (which makes sense) so I am thinking it makes sense to use a processFunction that will occasionally receive control messages and mostly just output a lot of messages.
>>> 
>>> This works from an API perspective, with a few different options, I could either:
>>> 
>>> A) have the processElement function block on calling the loop that will produce messages or
>>> B) have the processEelement function return (by pass the collector and starting the reading on a different thread), but continue to produce messages downstream
>>> 
>>> This obviously does raise some concerns though:
>>> 
>>> 1. Does this break any assumptions flink has of message lifecycle? Option A of blocking on processElement for very long periods seems straight forward but less than ideal, not to mention not being able to handle any other control messages.
>>> 
>>> However, I am not sure if a processFunction sending messages after the processElement function has returned would break some expectations flink has of operator lifeycles. Messages are also emitted by timers, etc, but this would be completely outside any of those function calls as it is started on another thread. This is obviously how most SourceFunctions work, but it isn't clear if the same technique holds for ProcessFunctions
>>> 
>>> 2. Would this have a negative impact on backpressure downstream? Since I am still going to be using the same collector instance, it seems like it should ultimately work, but I wonder if there are other details I am not aware of.
>>> 
>>> 3. Is this just a terrible idea in general? It seems like I could maybe do this by implementing directly on top of an Operator, but I am not as familiar with that API
>>> 
>>> Thanks in advance for any thoughts!
>>> 
>>> Addison
>> 
> 


Re: Using a ProcessFunction as a "Source"

Posted by Addison Higham <ad...@gmail.com>.
This is fairly stale, but getting back to this:

We ended up going the route of using the Operator API and implementing
something similar to the  `readFile` API with one real source function that
reads out splits and then a small abstraction over the
AbstractStreamOperator, a `MessagableSourceFunction`, that had a similar
API to processFunction.

It is a little bit more to deal with, but wasn't too bad all told.

I am hoping to get the code cleaned up and post it at least as an example
of how to use the Operator API for some more advanced use cases.

Aljoscha: That looks really interesting! I actually saw that too late to
consider something like that, but seems like a good change!

Thanks for the input.

Addison
On Thu, Aug 30, 2018 at 4:07 AM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Addison,
>
> for a while now different ideas about reworking the Source interface have
> been floated. I implemented a prototype that showcases my favoured approach
> for such a new interface:
> https://github.com/aljoscha/flink/commits/refactor-source-interface
>
> This basically splits the Source into two parts: a SplitEnumerator and a
> SplitReader. The enumerator is responsible for discovering what should be
> read and the reader is responsible for reading splits. In this model, the
> SplitReader does not necessarily have to sit at the beginning of the
> pipeline, it could sit somewhere in the middle and the splits don't have to
> necessarily come from the enumerator but could come from a different source.
>
> I think this could fit the use case that you're describing.
>
> Best,
> Aljoscha
>
> On 25. Aug 2018, at 11:45, Chesnay Schepler <ch...@apache.org> wrote:
>
> The SourceFunction interface is rather flexible so you can do pretty much
> whatever you want. Exact implementation depends on whether control messages
> are pulled or pushed to the source; in the first case you'd simply block
> within "run()" on the external call, in the latter you'd have it block on a
> queue of some sort that is fed by another thread waiting for messages.
>
> AFAIK you should never use the collector outside of "processElement".
>
> On 25.08.2018 05:15, vino yang wrote:
>
> Hi Addison,
>
> I have a lot of things I don't understand. Is your source self-generated
> message? Why can't source receive input? If the source is unacceptable then
> why is it called source? Isn't kafka-connector the input as source?
>
> If you mean that under normal circumstances it can't receive another input
> about control messages, there are some ways to solve it.
>
> 1) Access external systems in your source to get or subscribe to control
> messages, such as Zookeeper.
> 2) If your source is followed by a map or process operator, then they can
> be chained together as a "big" source, then you can pass your control
> message via Flink's new feature "broadcast state". See this blog post for
> details.[1]
> 3) Mix control messages with normal messages in the same message flow.
> After the control message is parsed, the corresponding action is taken. Of
> course, this kind of program is not very recommended.
>
> [1]:
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>
> Thanks, vino.
>
> Addison Higham <ad...@gmail.com> 于2018年8月25日周六 上午12:46写道:
>
>> HI,
>>
>> I am writing a parallel source function that ideally needs to receive
>> some messages as control information (specifically, a state message on
>> where to start reading from a kinesis stream). As far as I can tell, there
>> isn't a way to make a sourceFunction receive input (which makes sense) so I
>> am thinking it makes sense to use a processFunction that will occasionally
>> receive control messages and mostly just output a lot of messages.
>>
>> This works from an API perspective, with a few different options, I could
>> either:
>>
>> A) have the processElement function block on calling the loop that will
>> produce messages or
>> B) have the processEelement function return (by pass the collector and
>> starting the reading on a different thread), but continue to produce
>> messages downstream
>>
>> This obviously does raise some concerns though:
>>
>> 1. Does this break any assumptions flink has of message lifecycle? Option
>> A of blocking on processElement for very long periods seems straight
>> forward but less than ideal, not to mention not being able to handle any
>> other control messages.
>>
>> However, I am not sure if a processFunction sending messages after the
>> processElement function has returned would break some expectations flink
>> has of operator lifeycles. Messages are also emitted by timers, etc, but
>> this would be completely outside any of those function calls as it is
>> started on another thread. This is obviously how most SourceFunctions work,
>> but it isn't clear if the same technique holds for ProcessFunctions
>>
>> 2. Would this have a negative impact on backpressure downstream? Since I
>> am still going to be using the same collector instance, it seems like it
>> should ultimately work, but I wonder if there are other details I am not
>> aware of.
>>
>> 3. Is this just a terrible idea in general? It seems like I could maybe
>> do this by implementing directly on top of an Operator, but I am not as
>> familiar with that API
>>
>> Thanks in advance for any thoughts!
>>
>> Addison
>>
>
>
>

Re: Using a ProcessFunction as a "Source"

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Addison,

for a while now different ideas about reworking the Source interface have been floated. I implemented a prototype that showcases my favoured approach for such a new interface: https://github.com/aljoscha/flink/commits/refactor-source-interface <https://github.com/aljoscha/flink/commits/refactor-source-interface>

This basically splits the Source into two parts: a SplitEnumerator and a SplitReader. The enumerator is responsible for discovering what should be read and the reader is responsible for reading splits. In this model, the SplitReader does not necessarily have to sit at the beginning of the pipeline, it could sit somewhere in the middle and the splits don't have to necessarily come from the enumerator but could come from a different source.

I think this could fit the use case that you're describing.

Best,
Aljoscha

> On 25. Aug 2018, at 11:45, Chesnay Schepler <ch...@apache.org> wrote:
> 
> The SourceFunction interface is rather flexible so you can do pretty much whatever you want. Exact implementation depends on whether control messages are pulled or pushed to the source; in the first case you'd simply block within "run()" on the external call, in the latter you'd have it block on a queue of some sort that is fed by another thread waiting for messages.
> 
> AFAIK you should never use the collector outside of "processElement".
> 
> On 25.08.2018 05:15, vino yang wrote:
>> Hi Addison,
>> 
>> I have a lot of things I don't understand. Is your source self-generated message? Why can't source receive input? If the source is unacceptable then why is it called source? Isn't kafka-connector the input as source?
>> 
>> If you mean that under normal circumstances it can't receive another input about control messages, there are some ways to solve it.
>> 
>> 1) Access external systems in your source to get or subscribe to control messages, such as Zookeeper.
>> 2) If your source is followed by a map or process operator, then they can be chained together as a "big" source, then you can pass your control message via Flink's new feature "broadcast state". See this blog post for details.[1]
>> 3) Mix control messages with normal messages in the same message flow. After the control message is parsed, the corresponding action is taken. Of course, this kind of program is not very recommended.
>> 
>> [1]: https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink <https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink>
>> 
>> Thanks, vino.
>> 
>> Addison Higham <addisonj@gmail.com <ma...@gmail.com>> 于2018年8月25日周六 上午12:46写道:
>> HI,
>> 
>> I am writing a parallel source function that ideally needs to receive some messages as control information (specifically, a state message on where to start reading from a kinesis stream). As far as I can tell, there isn't a way to make a sourceFunction receive input (which makes sense) so I am thinking it makes sense to use a processFunction that will occasionally receive control messages and mostly just output a lot of messages.
>> 
>> This works from an API perspective, with a few different options, I could either:
>> 
>> A) have the processElement function block on calling the loop that will produce messages or
>> B) have the processEelement function return (by pass the collector and starting the reading on a different thread), but continue to produce messages downstream
>> 
>> This obviously does raise some concerns though:
>> 
>> 1. Does this break any assumptions flink has of message lifecycle? Option A of blocking on processElement for very long periods seems straight forward but less than ideal, not to mention not being able to handle any other control messages.
>> 
>> However, I am not sure if a processFunction sending messages after the processElement function has returned would break some expectations flink has of operator lifeycles. Messages are also emitted by timers, etc, but this would be completely outside any of those function calls as it is started on another thread. This is obviously how most SourceFunctions work, but it isn't clear if the same technique holds for ProcessFunctions
>> 
>> 2. Would this have a negative impact on backpressure downstream? Since I am still going to be using the same collector instance, it seems like it should ultimately work, but I wonder if there are other details I am not aware of.
>> 
>> 3. Is this just a terrible idea in general? It seems like I could maybe do this by implementing directly on top of an Operator, but I am not as familiar with that API
>> 
>> Thanks in advance for any thoughts!
>> 
>> Addison
> 


Re: Using a ProcessFunction as a "Source"

Posted by Chesnay Schepler <ch...@apache.org>.
The SourceFunction interface is rather flexible so you can do pretty 
much whatever you want. Exact implementation depends on whether control 
messages are pulled or pushed to the source; in the first case you'd 
simply block within "run()" on the external call, in the latter you'd 
have it block on a queue of some sort that is fed by another thread 
waiting for messages.

AFAIK you should never use the collector outside of "processElement".

On 25.08.2018 05:15, vino yang wrote:
> Hi Addison,
>
> I have a lot of things I don't understand. Is your source 
> self-generated message? Why can't source receive input? If the source 
> is unacceptable then why is it called source? Isn't kafka-connector 
> the input as source?
>
> If you mean that under normal circumstances it can't receive another 
> input about control messages, there are some ways to solve it.
>
> 1) Access external systems in your source to get or subscribe to 
> control messages, such as Zookeeper.
> 2) If your source is followed by a map or process operator, then they 
> can be chained together as a "big" source, then you can pass your 
> control message via Flink's new feature "broadcast state". See this 
> blog post for details.[1]
> 3) Mix control messages with normal messages in the same message flow. 
> After the control message is parsed, the corresponding action is 
> taken. Of course, this kind of program is not very recommended.
>
> [1]: 
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>
> Thanks, vino.
>
> Addison Higham <addisonj@gmail.com <ma...@gmail.com>> 
> 于2018年8月25日周六 上午12:46写道:
>
>     HI,
>
>     I am writing a parallel source function that ideally needs to
>     receive some messages as control information (specifically, a
>     state message on where to start reading from a kinesis stream). As
>     far as I can tell, there isn't a way to make a sourceFunction
>     receive input (which makes sense) so I am thinking it makes sense
>     to use a processFunction that will occasionally receive control
>     messages and mostly just output a lot of messages.
>
>     This works from an API perspective, with a few different options,
>     I could either:
>
>     A) have the processElement function block on calling the loop that
>     will produce messages or
>     B) have the processEelement function return (by pass the collector
>     and starting the reading on a different thread), but continue to
>     produce messages downstream
>
>     This obviously does raise some concerns though:
>
>     1. Does this break any assumptions flink has of message lifecycle?
>     Option A of blocking on processElement for very long periods seems
>     straight forward but less than ideal, not to mention not being
>     able to handle any other control messages.
>
>     However, I am not sure if a processFunction sending messages after
>     the processElement function has returned would break some
>     expectations flink has of operator lifeycles. Messages are also
>     emitted by timers, etc, but this would be completely outside any
>     of those function calls as it is started on another thread. This
>     is obviously how most SourceFunctions work, but it isn't clear if
>     the same technique holds for ProcessFunctions
>
>     2. Would this have a negative impact on backpressure downstream?
>     Since I am still going to be using the same collector instance, it
>     seems like it should ultimately work, but I wonder if there are
>     other details I am not aware of.
>
>     3. Is this just a terrible idea in general? It seems like I could
>     maybe do this by implementing directly on top of an Operator, but
>     I am not as familiar with that API
>
>     Thanks in advance for any thoughts!
>
>     Addison
>


Re: Using a ProcessFunction as a "Source"

Posted by vino yang <ya...@gmail.com>.
Hi Addison,

I have a lot of things I don't understand. Is your source self-generated
message? Why can't source receive input? If the source is unacceptable then
why is it called source? Isn't kafka-connector the input as source?

If you mean that under normal circumstances it can't receive another input
about control messages, there are some ways to solve it.

1) Access external systems in your source to get or subscribe to control
messages, such as Zookeeper.
2) If your source is followed by a map or process operator, then they can
be chained together as a "big" source, then you can pass your control
message via Flink's new feature "broadcast state". See this blog post for
details.[1]
3) Mix control messages with normal messages in the same message flow.
After the control message is parsed, the corresponding action is taken. Of
course, this kind of program is not very recommended.

[1]:
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink

Thanks, vino.

Addison Higham <ad...@gmail.com> 于2018年8月25日周六 上午12:46写道:

> HI,
>
> I am writing a parallel source function that ideally needs to receive some
> messages as control information (specifically, a state message on where to
> start reading from a kinesis stream). As far as I can tell, there isn't a
> way to make a sourceFunction receive input (which makes sense) so I am
> thinking it makes sense to use a processFunction that will occasionally
> receive control messages and mostly just output a lot of messages.
>
> This works from an API perspective, with a few different options, I could
> either:
>
> A) have the processElement function block on calling the loop that will
> produce messages or
> B) have the processEelement function return (by pass the collector and
> starting the reading on a different thread), but continue to produce
> messages downstream
>
> This obviously does raise some concerns though:
>
> 1. Does this break any assumptions flink has of message lifecycle? Option
> A of blocking on processElement for very long periods seems straight
> forward but less than ideal, not to mention not being able to handle any
> other control messages.
>
> However, I am not sure if a processFunction sending messages after the
> processElement function has returned would break some expectations flink
> has of operator lifeycles. Messages are also emitted by timers, etc, but
> this would be completely outside any of those function calls as it is
> started on another thread. This is obviously how most SourceFunctions work,
> but it isn't clear if the same technique holds for ProcessFunctions
>
> 2. Would this have a negative impact on backpressure downstream? Since I
> am still going to be using the same collector instance, it seems like it
> should ultimately work, but I wonder if there are other details I am not
> aware of.
>
> 3. Is this just a terrible idea in general? It seems like I could maybe do
> this by implementing directly on top of an Operator, but I am not as
> familiar with that API
>
> Thanks in advance for any thoughts!
>
> Addison
>