You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Matthias J. Sax" <mj...@informatik.hu-berlin.de> on 2015/03/24 17:05:49 UTC

Question about Flink Streaming

Hi,

as I get more familiar with Flink streaming and do some coding, I hit a
few points which I want do discuss about because I find them
contra-intuitive. Please tell me, what you think about it or clarify
what I misunderstood.

1) In class StreamInvokable has two methods .setup(...) and .open(...)
   -> what is the difference between both? When is each of both called
exactly? It seems to be, that both are used to setup an operator. Why
can't they be unified?

2) The same question about .close() and .cancel() ?

3) There is an class/interface hierarchy for user defined functions. The
top level interface is 'Function' and there is an interface
'RichFunction' and abstract class 'AbstractRichFunction'. For each
different type, there are user functions derived from. So far so good.
	However, the StreamInvokable class only takes a constructor argument
Function, indicating that RichFunctions are not supported. Internally,
the given function is tested to be a RichFunction (using instanceof) at
certain places. This in contra-intuitive from a API point of view.
	From my OO understanding it would be better to replace Function by
RichFunction everywhere. However, I was told that the (empty) Function
interface is necessary for lambda expressions. Thus, I would suggest to
extend the API with methods taking a RichFunction a parameter so it is
clear that those are supported, too.

4) There is the interface Timestamp that is used to extract a time stamp
for a record on order to create windows on a record attribute. I think
the name "Timestamp" is miss leading, because the class does not
represent a time stamp. I would rather call the interface
"TimestampExtractor" or something similar.

5) Stefan started the discussion about more tests for the streaming
component. I would additionally suggest to improve the Javadoc
documentation. The are many classes an method with missing or very brief
documentation and it is ofter hard to guess what they are used for. I
would also suggest to describe the interaction of components/classes and
WHY some thing are implemented in a certain way. As I have background
knowledge from Stratosphere, I personally can work around it and make
sense out of it (at least most times). However, for new contributers it
might be very hard to make sense out of it and to get started
implementing new features.


Cheers,
  Matthias



Re: Question about Flink Streaming

Posted by Gyula Fóra <gy...@gmail.com>.
The setup and open methods could be called together, but they do different
tasks, and therefore I dont see any reason why they should be in a same
method. This is a critical part of the code so better keep things clean and
separate.

The RuntimeContext refers to the operator while the TaskContext refers to
the task itself. We dont want to expose everything in the TaskContext to
the user, because that could lead to serious problems.

Ok I will talk with him.

On Tue, Mar 24, 2015 at 5:54 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Hi Gyula,
>
> thank a lot. I still don't understand why setup() and open() can not be
> unified? I also don't know, what the difference between RuntimeContext
> and StreamTaskContext is (or to be more precise, why not using a single
> context class that unifies both)?
>
> About the renaming of Timestamp class: Marton told me, that he thinks
> the name is fine and should not be changed. Before opening a JIRA for
> it, you two should get in sync and decide what to do.
>
>
> -Matthias
>
>
>
> On 03/24/2015 05:38 PM, Gyula Fóra wrote:
> > Hey Matthias,
> >
> > Let's see if I get these things for you :)
> >
> > 1) The difference between setup and open is that, setup to set things
> like
> > collectors, runtimecontext and everything that will be used by the
> > implemented invokable, and also by the rich functions. Open is called
> after
> > setup, to actually open the execution of the UDF operator.
> >
> > 2) Close is always called, even when the task is cancelled. In addition
> > when a task is failing (maybe because other tasks are failing) the cancel
> > method is called and the main thread is interrupted. The point of having
> a
> > cancel method that some invokables might require different shutdown logic
> > in case of failure.
> >
> > 3) This I need to look into...
> >
> > 4) You are right about the unintuitive name here, if you could open a
> JIRA
> > for this I would appreciate that :)
> >
> > 5) You are absolutely right on this point, we need to spend more effort
> on
> > writing proper docs.
> >
> > I hope I could clarify some stuff.
> >
> > Cheers,
> > Gyula
> >
> > On Tue, Mar 24, 2015 at 5:05 PM, Matthias J. Sax <
> > mjsax@informatik.hu-berlin.de> wrote:
> >
> >> Hi,
> >>
> >> as I get more familiar with Flink streaming and do some coding, I hit a
> >> few points which I want do discuss about because I find them
> >> contra-intuitive. Please tell me, what you think about it or clarify
> >> what I misunderstood.
> >>
> >> 1) In class StreamInvokable has two methods .setup(...) and .open(...)
> >>    -> what is the difference between both? When is each of both called
> >> exactly? It seems to be, that both are used to setup an operator. Why
> >> can't they be unified?
> >>
> >> 2) The same question about .close() and .cancel() ?
> >>
> >> 3) There is an class/interface hierarchy for user defined functions. The
> >> top level interface is 'Function' and there is an interface
> >> 'RichFunction' and abstract class 'AbstractRichFunction'. For each
> >> different type, there are user functions derived from. So far so good.
> >>         However, the StreamInvokable class only takes a constructor
> >> argument
> >> Function, indicating that RichFunctions are not supported. Internally,
> >> the given function is tested to be a RichFunction (using instanceof) at
> >> certain places. This in contra-intuitive from a API point of view.
> >>         From my OO understanding it would be better to replace Function
> by
> >> RichFunction everywhere. However, I was told that the (empty) Function
> >> interface is necessary for lambda expressions. Thus, I would suggest to
> >> extend the API with methods taking a RichFunction a parameter so it is
> >> clear that those are supported, too.
> >>
> >> 4) There is the interface Timestamp that is used to extract a time stamp
> >> for a record on order to create windows on a record attribute. I think
> >> the name "Timestamp" is miss leading, because the class does not
> >> represent a time stamp. I would rather call the interface
> >> "TimestampExtractor" or something similar.
> >>
> >> 5) Stefan started the discussion about more tests for the streaming
> >> component. I would additionally suggest to improve the Javadoc
> >> documentation. The are many classes an method with missing or very brief
> >> documentation and it is ofter hard to guess what they are used for. I
> >> would also suggest to describe the interaction of components/classes and
> >> WHY some thing are implemented in a certain way. As I have background
> >> knowledge from Stratosphere, I personally can work around it and make
> >> sense out of it (at least most times). However, for new contributers it
> >> might be very hard to make sense out of it and to get started
> >> implementing new features.
> >>
> >>
> >> Cheers,
> >>   Matthias
> >>
> >>
> >>
> >
>
>

Re: Question about Flink Streaming

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
Hi Gyula,

thank a lot. I still don't understand why setup() and open() can not be
unified? I also don't know, what the difference between RuntimeContext
and StreamTaskContext is (or to be more precise, why not using a single
context class that unifies both)?

About the renaming of Timestamp class: Marton told me, that he thinks
the name is fine and should not be changed. Before opening a JIRA for
it, you two should get in sync and decide what to do.


-Matthias



On 03/24/2015 05:38 PM, Gyula Fóra wrote:
> Hey Matthias,
> 
> Let's see if I get these things for you :)
> 
> 1) The difference between setup and open is that, setup to set things like
> collectors, runtimecontext and everything that will be used by the
> implemented invokable, and also by the rich functions. Open is called after
> setup, to actually open the execution of the UDF operator.
> 
> 2) Close is always called, even when the task is cancelled. In addition
> when a task is failing (maybe because other tasks are failing) the cancel
> method is called and the main thread is interrupted. The point of having a
> cancel method that some invokables might require different shutdown logic
> in case of failure.
> 
> 3) This I need to look into...
> 
> 4) You are right about the unintuitive name here, if you could open a JIRA
> for this I would appreciate that :)
> 
> 5) You are absolutely right on this point, we need to spend more effort on
> writing proper docs.
> 
> I hope I could clarify some stuff.
> 
> Cheers,
> Gyula
> 
> On Tue, Mar 24, 2015 at 5:05 PM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
> 
>> Hi,
>>
>> as I get more familiar with Flink streaming and do some coding, I hit a
>> few points which I want do discuss about because I find them
>> contra-intuitive. Please tell me, what you think about it or clarify
>> what I misunderstood.
>>
>> 1) In class StreamInvokable has two methods .setup(...) and .open(...)
>>    -> what is the difference between both? When is each of both called
>> exactly? It seems to be, that both are used to setup an operator. Why
>> can't they be unified?
>>
>> 2) The same question about .close() and .cancel() ?
>>
>> 3) There is an class/interface hierarchy for user defined functions. The
>> top level interface is 'Function' and there is an interface
>> 'RichFunction' and abstract class 'AbstractRichFunction'. For each
>> different type, there are user functions derived from. So far so good.
>>         However, the StreamInvokable class only takes a constructor
>> argument
>> Function, indicating that RichFunctions are not supported. Internally,
>> the given function is tested to be a RichFunction (using instanceof) at
>> certain places. This in contra-intuitive from a API point of view.
>>         From my OO understanding it would be better to replace Function by
>> RichFunction everywhere. However, I was told that the (empty) Function
>> interface is necessary for lambda expressions. Thus, I would suggest to
>> extend the API with methods taking a RichFunction a parameter so it is
>> clear that those are supported, too.
>>
>> 4) There is the interface Timestamp that is used to extract a time stamp
>> for a record on order to create windows on a record attribute. I think
>> the name "Timestamp" is miss leading, because the class does not
>> represent a time stamp. I would rather call the interface
>> "TimestampExtractor" or something similar.
>>
>> 5) Stefan started the discussion about more tests for the streaming
>> component. I would additionally suggest to improve the Javadoc
>> documentation. The are many classes an method with missing or very brief
>> documentation and it is ofter hard to guess what they are used for. I
>> would also suggest to describe the interaction of components/classes and
>> WHY some thing are implemented in a certain way. As I have background
>> knowledge from Stratosphere, I personally can work around it and make
>> sense out of it (at least most times). However, for new contributers it
>> might be very hard to make sense out of it and to get started
>> implementing new features.
>>
>>
>> Cheers,
>>   Matthias
>>
>>
>>
> 


Re: Question about Flink Streaming

Posted by Gyula Fóra <gy...@apache.org>.
Hey Matthias,

Let's see if I get these things for you :)

1) The difference between setup and open is that, setup to set things like
collectors, runtimecontext and everything that will be used by the
implemented invokable, and also by the rich functions. Open is called after
setup, to actually open the execution of the UDF operator.

2) Close is always called, even when the task is cancelled. In addition
when a task is failing (maybe because other tasks are failing) the cancel
method is called and the main thread is interrupted. The point of having a
cancel method that some invokables might require different shutdown logic
in case of failure.

3) This I need to look into...

4) You are right about the unintuitive name here, if you could open a JIRA
for this I would appreciate that :)

5) You are absolutely right on this point, we need to spend more effort on
writing proper docs.

I hope I could clarify some stuff.

Cheers,
Gyula

On Tue, Mar 24, 2015 at 5:05 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Hi,
>
> as I get more familiar with Flink streaming and do some coding, I hit a
> few points which I want do discuss about because I find them
> contra-intuitive. Please tell me, what you think about it or clarify
> what I misunderstood.
>
> 1) In class StreamInvokable has two methods .setup(...) and .open(...)
>    -> what is the difference between both? When is each of both called
> exactly? It seems to be, that both are used to setup an operator. Why
> can't they be unified?
>
> 2) The same question about .close() and .cancel() ?
>
> 3) There is an class/interface hierarchy for user defined functions. The
> top level interface is 'Function' and there is an interface
> 'RichFunction' and abstract class 'AbstractRichFunction'. For each
> different type, there are user functions derived from. So far so good.
>         However, the StreamInvokable class only takes a constructor
> argument
> Function, indicating that RichFunctions are not supported. Internally,
> the given function is tested to be a RichFunction (using instanceof) at
> certain places. This in contra-intuitive from a API point of view.
>         From my OO understanding it would be better to replace Function by
> RichFunction everywhere. However, I was told that the (empty) Function
> interface is necessary for lambda expressions. Thus, I would suggest to
> extend the API with methods taking a RichFunction a parameter so it is
> clear that those are supported, too.
>
> 4) There is the interface Timestamp that is used to extract a time stamp
> for a record on order to create windows on a record attribute. I think
> the name "Timestamp" is miss leading, because the class does not
> represent a time stamp. I would rather call the interface
> "TimestampExtractor" or something similar.
>
> 5) Stefan started the discussion about more tests for the streaming
> component. I would additionally suggest to improve the Javadoc
> documentation. The are many classes an method with missing or very brief
> documentation and it is ofter hard to guess what they are used for. I
> would also suggest to describe the interaction of components/classes and
> WHY some thing are implemented in a certain way. As I have background
> knowledge from Stratosphere, I personally can work around it and make
> sense out of it (at least most times). However, for new contributers it
> might be very hard to make sense out of it and to get started
> implementing new features.
>
>
> Cheers,
>   Matthias
>
>
>