You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2015/05/08 10:16:41 UTC

[DISCUSS] Behaviour of Streaming Sources

Hi,
in the process of reworking the Streaming Operator model I'm also
reworking the sources in order to get rid of the loop in each source.
Right now, the interface for sources (SourceFunction) has one method:
run(). This is called when the source starts and can just output
elements at any time using the Collector interface. This does not give
the Task that runs the source a lot of control in suspending operation
for performing checkpoints or some such thing.

I thought about changing the interface to this:

interface SourceFunction<T>  {
  boolean reachedEnd();
  T next();
}

This is similar to the batch API and also to what Stephan proposes in
his pull request. I think this will not work for streaming because
sources might not have new elements to emit at the moment but might
have something to emit in the future. This is problematic because
streaming topologies are usually running indefinitely. In that case,
the reachedEnd() and next() would have to be blocking (until a new
element arrives). This again does not give the task the power to
suspend operation at will.

I propose a three function interface:

interface SourceFunction<T> {
  boolean reachedEnd():
  boolean hasNext():
  T next();
}

where the contract for the source is as follows:
 - reachedEnd() == true => stop the source
 - hasNext() == true => call next() to retrieve next element
 - hasNext() == false => call again at some later point
 - next() => retrieve next element, throw exception if no element available

I thought about allowing next() to return NULL to signal that no
element is available at the moment. This will not work because a
source might want to return NULL as an element.

What do you think? Any other ideas about solving this?

Cheers,
Aljoscha

Re: [DISCUSS] Behaviour of Streaming Sources

Posted by Stephan Ewen <se...@apache.org>.
Thanks, Marton, for the summary.

The PollingStreamSource essentially follows the API suggested by Aljoscha,
and will probably internally use a backoff sleep time (as Matthias pointed
out), so
we really have arrived at a mix of techniques ;-)

On Mon, May 11, 2015 at 4:12 PM, Márton Balassi <ba...@gmail.com>
wrote:

> We had a conversation with Stephan, Aljoscha, Gyula and Paris and converged
> on the following outline for the streaming source interface. The question
> is tricky because we need to coordinate between the actual source
> computation and triggering the checkpointing of the state of the source.
>
> We should provide two interfaces, let us refer to the as StreamSource and
> PollingStreamSource for now.
>
> StreamSource has two methods: next() and reachedEnd() as outlined in
> Stephan's recent pull request. [1] The main idea here that reachedEnd()
> should only return false when no more data will arrive and the source can
> be closed. As a consequence when no data is currently available, but might
> arrive in the future the methods of the interface can be blocking
> operations. To properly coordinate this with checkpointing the state of the
> sources these blocking calls should be able to forward
> InterruptedExceptions, so that the checkpointed can be done when triggered.
> This puts expectations on the usercode.
>
> To provide an option where this behavior is not enforceable we would
> introduce PollingStreamSource which in addition to the next() and
> reachedEnd() methods would have a hasNext() method that would return true
> when data is currently available. Here all of the functions are expected to
> return almost immediately (so that checkpointed is not delayed too much
> when triggered), but usercode is not expected to handle interrupts.
>
> [1] https://github.com/apache/flink/pull/643
>
> On Mon, May 11, 2015 at 10:37 AM, Gyula Fóra <gy...@gmail.com> wrote:
>
> > I would not go into this direction. Returning lists is messy I think. I
> > would stick with hasNext and Next returning a single element
> >
> > On Mon, May 11, 2015 at 10:20 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > We could also change next() to return List<T> and say that the method
> > > must not sit and wait but simply return stuff that is available
> > > without waiting while also being able to not return anything for the
> > > moment.
> > >
> > > On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax
> > > <mj...@informatik.hu-berlin.de> wrote:
> > > > You are right. That is why I pointed out this already:
> > > >
> > > >> -> You could force the UDF to return each time, be disallowing
> > > >>>> consecutive calls to Collector.out(...).
> > > >
> > > > The Storm design would avoid the "NULL-Problem" Aljoscha mentioned,
> > too.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 05/08/2015 10:59 AM, Gyula Fóra wrote:
> > > >> I think the problem with this void next() approach is exactly the
> way
> > it
> > > >> works:
> > > >>
> > > >> "Using this interface, "next()" can loop internally as long
> > > >> as tuples are available and return if there is (currently) no
> input."
> > > >>
> > > >> We dont want the user to loop internally in the next because then we
> > > have
> > > >> almost the same problem as now with the run(). We want to do
> snapshots
> > > >> between 2 produced source elements, roughly the same time at all the
> > > >> sources so we cannot afford waiting for some random user behaviour
> to
> > > >> finish.
> > > >>
> > > >>
> > > >> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
> > > >> mjsax@informatik.hu-berlin.de> wrote:
> > > >>
> > > >>> Did you consider the Storm way to handle this?
> > > >>>
> > > >>> Storm offers a method "void next()" that uses a collector object to
> > > emit
> > > >>> new tuples. Using this interface, "next()" can loop internally as
> > long
> > > >>> as tuples are available and return if there is (currently) no
> input.
> > > >>> What I have seen, people tend to emit a single tuple an leave
> next()
> > > >>> immediately, because Storm call next() in an infinite loop anyway.
> > > >>> -> You could force the UDF to return each time, be disallowing
> > > >>> consecutive calls to Collector.out(...).
> > > >>>
> > > >>> If next() is called by the system and it returns, it is easy to
> check
> > > if
> > > >>> the out(..) method of the collector object was called at least
> once.
> > If
> > > >>> the recored was emitted, Storm "sleeps" for a while before calling
> > > >>> next() again, to avoid busy waiting. The sleeping time is increased
> > for
> > > >>> consecutive "empty" next() calls and reset the first time next()
> > emits
> > > >>> records again.
> > > >>>
> > > >>> I like this interface, because it's very simple and would prefer it
> > > over
> > > >>> an interface with many methods.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>>
> > > >>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> > > >>>> Hi,
> > > >>>> in the process of reworking the Streaming Operator model I'm also
> > > >>>> reworking the sources in order to get rid of the loop in each
> > source.
> > > >>>> Right now, the interface for sources (SourceFunction) has one
> > method:
> > > >>>> run(). This is called when the source starts and can just output
> > > >>>> elements at any time using the Collector interface. This does not
> > give
> > > >>>> the Task that runs the source a lot of control in suspending
> > operation
> > > >>>> for performing checkpoints or some such thing.
> > > >>>>
> > > >>>> I thought about changing the interface to this:
> > > >>>>
> > > >>>> interface SourceFunction<T>  {
> > > >>>>   boolean reachedEnd();
> > > >>>>   T next();
> > > >>>> }
> > > >>>>
> > > >>>> This is similar to the batch API and also to what Stephan proposes
> > in
> > > >>>> his pull request. I think this will not work for streaming because
> > > >>>> sources might not have new elements to emit at the moment but
> might
> > > >>>> have something to emit in the future. This is problematic because
> > > >>>> streaming topologies are usually running indefinitely. In that
> case,
> > > >>>> the reachedEnd() and next() would have to be blocking (until a new
> > > >>>> element arrives). This again does not give the task the power to
> > > >>>> suspend operation at will.
> > > >>>>
> > > >>>> I propose a three function interface:
> > > >>>>
> > > >>>> interface SourceFunction<T> {
> > > >>>>   boolean reachedEnd():
> > > >>>>   boolean hasNext():
> > > >>>>   T next();
> > > >>>> }
> > > >>>>
> > > >>>> where the contract for the source is as follows:
> > > >>>>  - reachedEnd() == true => stop the source
> > > >>>>  - hasNext() == true => call next() to retrieve next element
> > > >>>>  - hasNext() == false => call again at some later point
> > > >>>>  - next() => retrieve next element, throw exception if no element
> > > >>> available
> > > >>>>
> > > >>>> I thought about allowing next() to return NULL to signal that no
> > > >>>> element is available at the moment. This will not work because a
> > > >>>> source might want to return NULL as an element.
> > > >>>>
> > > >>>> What do you think? Any other ideas about solving this?
> > > >>>>
> > > >>>> Cheers,
> > > >>>> Aljoscha
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] Behaviour of Streaming Sources

Posted by Márton Balassi <ba...@gmail.com>.
We had a conversation with Stephan, Aljoscha, Gyula and Paris and converged
on the following outline for the streaming source interface. The question
is tricky because we need to coordinate between the actual source
computation and triggering the checkpointing of the state of the source.

We should provide two interfaces, let us refer to the as StreamSource and
PollingStreamSource for now.

StreamSource has two methods: next() and reachedEnd() as outlined in
Stephan's recent pull request. [1] The main idea here that reachedEnd()
should only return false when no more data will arrive and the source can
be closed. As a consequence when no data is currently available, but might
arrive in the future the methods of the interface can be blocking
operations. To properly coordinate this with checkpointing the state of the
sources these blocking calls should be able to forward
InterruptedExceptions, so that the checkpointed can be done when triggered.
This puts expectations on the usercode.

To provide an option where this behavior is not enforceable we would
introduce PollingStreamSource which in addition to the next() and
reachedEnd() methods would have a hasNext() method that would return true
when data is currently available. Here all of the functions are expected to
return almost immediately (so that checkpointed is not delayed too much
when triggered), but usercode is not expected to handle interrupts.

[1] https://github.com/apache/flink/pull/643

On Mon, May 11, 2015 at 10:37 AM, Gyula Fóra <gy...@gmail.com> wrote:

> I would not go into this direction. Returning lists is messy I think. I
> would stick with hasNext and Next returning a single element
>
> On Mon, May 11, 2015 at 10:20 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > We could also change next() to return List<T> and say that the method
> > must not sit and wait but simply return stuff that is available
> > without waiting while also being able to not return anything for the
> > moment.
> >
> > On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax
> > <mj...@informatik.hu-berlin.de> wrote:
> > > You are right. That is why I pointed out this already:
> > >
> > >> -> You could force the UDF to return each time, be disallowing
> > >>>> consecutive calls to Collector.out(...).
> > >
> > > The Storm design would avoid the "NULL-Problem" Aljoscha mentioned,
> too.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 05/08/2015 10:59 AM, Gyula Fóra wrote:
> > >> I think the problem with this void next() approach is exactly the way
> it
> > >> works:
> > >>
> > >> "Using this interface, "next()" can loop internally as long
> > >> as tuples are available and return if there is (currently) no input."
> > >>
> > >> We dont want the user to loop internally in the next because then we
> > have
> > >> almost the same problem as now with the run(). We want to do snapshots
> > >> between 2 produced source elements, roughly the same time at all the
> > >> sources so we cannot afford waiting for some random user behaviour to
> > >> finish.
> > >>
> > >>
> > >> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
> > >> mjsax@informatik.hu-berlin.de> wrote:
> > >>
> > >>> Did you consider the Storm way to handle this?
> > >>>
> > >>> Storm offers a method "void next()" that uses a collector object to
> > emit
> > >>> new tuples. Using this interface, "next()" can loop internally as
> long
> > >>> as tuples are available and return if there is (currently) no input.
> > >>> What I have seen, people tend to emit a single tuple an leave next()
> > >>> immediately, because Storm call next() in an infinite loop anyway.
> > >>> -> You could force the UDF to return each time, be disallowing
> > >>> consecutive calls to Collector.out(...).
> > >>>
> > >>> If next() is called by the system and it returns, it is easy to check
> > if
> > >>> the out(..) method of the collector object was called at least once.
> If
> > >>> the recored was emitted, Storm "sleeps" for a while before calling
> > >>> next() again, to avoid busy waiting. The sleeping time is increased
> for
> > >>> consecutive "empty" next() calls and reset the first time next()
> emits
> > >>> records again.
> > >>>
> > >>> I like this interface, because it's very simple and would prefer it
> > over
> > >>> an interface with many methods.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> > >>>> Hi,
> > >>>> in the process of reworking the Streaming Operator model I'm also
> > >>>> reworking the sources in order to get rid of the loop in each
> source.
> > >>>> Right now, the interface for sources (SourceFunction) has one
> method:
> > >>>> run(). This is called when the source starts and can just output
> > >>>> elements at any time using the Collector interface. This does not
> give
> > >>>> the Task that runs the source a lot of control in suspending
> operation
> > >>>> for performing checkpoints or some such thing.
> > >>>>
> > >>>> I thought about changing the interface to this:
> > >>>>
> > >>>> interface SourceFunction<T>  {
> > >>>>   boolean reachedEnd();
> > >>>>   T next();
> > >>>> }
> > >>>>
> > >>>> This is similar to the batch API and also to what Stephan proposes
> in
> > >>>> his pull request. I think this will not work for streaming because
> > >>>> sources might not have new elements to emit at the moment but might
> > >>>> have something to emit in the future. This is problematic because
> > >>>> streaming topologies are usually running indefinitely. In that case,
> > >>>> the reachedEnd() and next() would have to be blocking (until a new
> > >>>> element arrives). This again does not give the task the power to
> > >>>> suspend operation at will.
> > >>>>
> > >>>> I propose a three function interface:
> > >>>>
> > >>>> interface SourceFunction<T> {
> > >>>>   boolean reachedEnd():
> > >>>>   boolean hasNext():
> > >>>>   T next();
> > >>>> }
> > >>>>
> > >>>> where the contract for the source is as follows:
> > >>>>  - reachedEnd() == true => stop the source
> > >>>>  - hasNext() == true => call next() to retrieve next element
> > >>>>  - hasNext() == false => call again at some later point
> > >>>>  - next() => retrieve next element, throw exception if no element
> > >>> available
> > >>>>
> > >>>> I thought about allowing next() to return NULL to signal that no
> > >>>> element is available at the moment. This will not work because a
> > >>>> source might want to return NULL as an element.
> > >>>>
> > >>>> What do you think? Any other ideas about solving this?
> > >>>>
> > >>>> Cheers,
> > >>>> Aljoscha
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
>

Re: [DISCUSS] Behaviour of Streaming Sources

Posted by Gyula Fóra <gy...@gmail.com>.
I would not go into this direction. Returning lists is messy I think. I
would stick with hasNext and Next returning a single element

On Mon, May 11, 2015 at 10:20 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> We could also change next() to return List<T> and say that the method
> must not sit and wait but simply return stuff that is available
> without waiting while also being able to not return anything for the
> moment.
>
> On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax
> <mj...@informatik.hu-berlin.de> wrote:
> > You are right. That is why I pointed out this already:
> >
> >> -> You could force the UDF to return each time, be disallowing
> >>>> consecutive calls to Collector.out(...).
> >
> > The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, too.
> >
> >
> > -Matthias
> >
> >
> >
> > On 05/08/2015 10:59 AM, Gyula Fóra wrote:
> >> I think the problem with this void next() approach is exactly the way it
> >> works:
> >>
> >> "Using this interface, "next()" can loop internally as long
> >> as tuples are available and return if there is (currently) no input."
> >>
> >> We dont want the user to loop internally in the next because then we
> have
> >> almost the same problem as now with the run(). We want to do snapshots
> >> between 2 produced source elements, roughly the same time at all the
> >> sources so we cannot afford waiting for some random user behaviour to
> >> finish.
> >>
> >>
> >> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
> >> mjsax@informatik.hu-berlin.de> wrote:
> >>
> >>> Did you consider the Storm way to handle this?
> >>>
> >>> Storm offers a method "void next()" that uses a collector object to
> emit
> >>> new tuples. Using this interface, "next()" can loop internally as long
> >>> as tuples are available and return if there is (currently) no input.
> >>> What I have seen, people tend to emit a single tuple an leave next()
> >>> immediately, because Storm call next() in an infinite loop anyway.
> >>> -> You could force the UDF to return each time, be disallowing
> >>> consecutive calls to Collector.out(...).
> >>>
> >>> If next() is called by the system and it returns, it is easy to check
> if
> >>> the out(..) method of the collector object was called at least once. If
> >>> the recored was emitted, Storm "sleeps" for a while before calling
> >>> next() again, to avoid busy waiting. The sleeping time is increased for
> >>> consecutive "empty" next() calls and reset the first time next() emits
> >>> records again.
> >>>
> >>> I like this interface, because it's very simple and would prefer it
> over
> >>> an interface with many methods.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> >>>> Hi,
> >>>> in the process of reworking the Streaming Operator model I'm also
> >>>> reworking the sources in order to get rid of the loop in each source.
> >>>> Right now, the interface for sources (SourceFunction) has one method:
> >>>> run(). This is called when the source starts and can just output
> >>>> elements at any time using the Collector interface. This does not give
> >>>> the Task that runs the source a lot of control in suspending operation
> >>>> for performing checkpoints or some such thing.
> >>>>
> >>>> I thought about changing the interface to this:
> >>>>
> >>>> interface SourceFunction<T>  {
> >>>>   boolean reachedEnd();
> >>>>   T next();
> >>>> }
> >>>>
> >>>> This is similar to the batch API and also to what Stephan proposes in
> >>>> his pull request. I think this will not work for streaming because
> >>>> sources might not have new elements to emit at the moment but might
> >>>> have something to emit in the future. This is problematic because
> >>>> streaming topologies are usually running indefinitely. In that case,
> >>>> the reachedEnd() and next() would have to be blocking (until a new
> >>>> element arrives). This again does not give the task the power to
> >>>> suspend operation at will.
> >>>>
> >>>> I propose a three function interface:
> >>>>
> >>>> interface SourceFunction<T> {
> >>>>   boolean reachedEnd():
> >>>>   boolean hasNext():
> >>>>   T next();
> >>>> }
> >>>>
> >>>> where the contract for the source is as follows:
> >>>>  - reachedEnd() == true => stop the source
> >>>>  - hasNext() == true => call next() to retrieve next element
> >>>>  - hasNext() == false => call again at some later point
> >>>>  - next() => retrieve next element, throw exception if no element
> >>> available
> >>>>
> >>>> I thought about allowing next() to return NULL to signal that no
> >>>> element is available at the moment. This will not work because a
> >>>> source might want to return NULL as an element.
> >>>>
> >>>> What do you think? Any other ideas about solving this?
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>
> >>>
> >>>
> >>
> >
>

Re: [DISCUSS] Behaviour of Streaming Sources

Posted by Aljoscha Krettek <al...@apache.org>.
We could also change next() to return List<T> and say that the method
must not sit and wait but simply return stuff that is available
without waiting while also being able to not return anything for the
moment.

On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax
<mj...@informatik.hu-berlin.de> wrote:
> You are right. That is why I pointed out this already:
>
>> -> You could force the UDF to return each time, be disallowing
>>>> consecutive calls to Collector.out(...).
>
> The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, too.
>
>
> -Matthias
>
>
>
> On 05/08/2015 10:59 AM, Gyula Fóra wrote:
>> I think the problem with this void next() approach is exactly the way it
>> works:
>>
>> "Using this interface, "next()" can loop internally as long
>> as tuples are available and return if there is (currently) no input."
>>
>> We dont want the user to loop internally in the next because then we have
>> almost the same problem as now with the run(). We want to do snapshots
>> between 2 produced source elements, roughly the same time at all the
>> sources so we cannot afford waiting for some random user behaviour to
>> finish.
>>
>>
>> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
>> mjsax@informatik.hu-berlin.de> wrote:
>>
>>> Did you consider the Storm way to handle this?
>>>
>>> Storm offers a method "void next()" that uses a collector object to emit
>>> new tuples. Using this interface, "next()" can loop internally as long
>>> as tuples are available and return if there is (currently) no input.
>>> What I have seen, people tend to emit a single tuple an leave next()
>>> immediately, because Storm call next() in an infinite loop anyway.
>>> -> You could force the UDF to return each time, be disallowing
>>> consecutive calls to Collector.out(...).
>>>
>>> If next() is called by the system and it returns, it is easy to check if
>>> the out(..) method of the collector object was called at least once. If
>>> the recored was emitted, Storm "sleeps" for a while before calling
>>> next() again, to avoid busy waiting. The sleeping time is increased for
>>> consecutive "empty" next() calls and reset the first time next() emits
>>> records again.
>>>
>>> I like this interface, because it's very simple and would prefer it over
>>> an interface with many methods.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
>>>> Hi,
>>>> in the process of reworking the Streaming Operator model I'm also
>>>> reworking the sources in order to get rid of the loop in each source.
>>>> Right now, the interface for sources (SourceFunction) has one method:
>>>> run(). This is called when the source starts and can just output
>>>> elements at any time using the Collector interface. This does not give
>>>> the Task that runs the source a lot of control in suspending operation
>>>> for performing checkpoints or some such thing.
>>>>
>>>> I thought about changing the interface to this:
>>>>
>>>> interface SourceFunction<T>  {
>>>>   boolean reachedEnd();
>>>>   T next();
>>>> }
>>>>
>>>> This is similar to the batch API and also to what Stephan proposes in
>>>> his pull request. I think this will not work for streaming because
>>>> sources might not have new elements to emit at the moment but might
>>>> have something to emit in the future. This is problematic because
>>>> streaming topologies are usually running indefinitely. In that case,
>>>> the reachedEnd() and next() would have to be blocking (until a new
>>>> element arrives). This again does not give the task the power to
>>>> suspend operation at will.
>>>>
>>>> I propose a three function interface:
>>>>
>>>> interface SourceFunction<T> {
>>>>   boolean reachedEnd():
>>>>   boolean hasNext():
>>>>   T next();
>>>> }
>>>>
>>>> where the contract for the source is as follows:
>>>>  - reachedEnd() == true => stop the source
>>>>  - hasNext() == true => call next() to retrieve next element
>>>>  - hasNext() == false => call again at some later point
>>>>  - next() => retrieve next element, throw exception if no element
>>> available
>>>>
>>>> I thought about allowing next() to return NULL to signal that no
>>>> element is available at the moment. This will not work because a
>>>> source might want to return NULL as an element.
>>>>
>>>> What do you think? Any other ideas about solving this?
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>
>>>
>>
>

Re: [DISCUSS] Behaviour of Streaming Sources

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
You are right. That is why I pointed out this already:

> -> You could force the UDF to return each time, be disallowing
>>> consecutive calls to Collector.out(...).

The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, too.


-Matthias



On 05/08/2015 10:59 AM, Gyula Fóra wrote:
> I think the problem with this void next() approach is exactly the way it
> works:
> 
> "Using this interface, "next()" can loop internally as long
> as tuples are available and return if there is (currently) no input."
> 
> We dont want the user to loop internally in the next because then we have
> almost the same problem as now with the run(). We want to do snapshots
> between 2 produced source elements, roughly the same time at all the
> sources so we cannot afford waiting for some random user behaviour to
> finish.
> 
> 
> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
> 
>> Did you consider the Storm way to handle this?
>>
>> Storm offers a method "void next()" that uses a collector object to emit
>> new tuples. Using this interface, "next()" can loop internally as long
>> as tuples are available and return if there is (currently) no input.
>> What I have seen, people tend to emit a single tuple an leave next()
>> immediately, because Storm call next() in an infinite loop anyway.
>> -> You could force the UDF to return each time, be disallowing
>> consecutive calls to Collector.out(...).
>>
>> If next() is called by the system and it returns, it is easy to check if
>> the out(..) method of the collector object was called at least once. If
>> the recored was emitted, Storm "sleeps" for a while before calling
>> next() again, to avoid busy waiting. The sleeping time is increased for
>> consecutive "empty" next() calls and reset the first time next() emits
>> records again.
>>
>> I like this interface, because it's very simple and would prefer it over
>> an interface with many methods.
>>
>>
>> -Matthias
>>
>>
>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
>>> Hi,
>>> in the process of reworking the Streaming Operator model I'm also
>>> reworking the sources in order to get rid of the loop in each source.
>>> Right now, the interface for sources (SourceFunction) has one method:
>>> run(). This is called when the source starts and can just output
>>> elements at any time using the Collector interface. This does not give
>>> the Task that runs the source a lot of control in suspending operation
>>> for performing checkpoints or some such thing.
>>>
>>> I thought about changing the interface to this:
>>>
>>> interface SourceFunction<T>  {
>>>   boolean reachedEnd();
>>>   T next();
>>> }
>>>
>>> This is similar to the batch API and also to what Stephan proposes in
>>> his pull request. I think this will not work for streaming because
>>> sources might not have new elements to emit at the moment but might
>>> have something to emit in the future. This is problematic because
>>> streaming topologies are usually running indefinitely. In that case,
>>> the reachedEnd() and next() would have to be blocking (until a new
>>> element arrives). This again does not give the task the power to
>>> suspend operation at will.
>>>
>>> I propose a three function interface:
>>>
>>> interface SourceFunction<T> {
>>>   boolean reachedEnd():
>>>   boolean hasNext():
>>>   T next();
>>> }
>>>
>>> where the contract for the source is as follows:
>>>  - reachedEnd() == true => stop the source
>>>  - hasNext() == true => call next() to retrieve next element
>>>  - hasNext() == false => call again at some later point
>>>  - next() => retrieve next element, throw exception if no element
>> available
>>>
>>> I thought about allowing next() to return NULL to signal that no
>>> element is available at the moment. This will not work because a
>>> source might want to return NULL as an element.
>>>
>>> What do you think? Any other ideas about solving this?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>
>>
> 


Re: [DISCUSS] Behaviour of Streaming Sources

Posted by Gyula Fóra <gy...@gmail.com>.
I think the problem with this void next() approach is exactly the way it
works:

"Using this interface, "next()" can loop internally as long
as tuples are available and return if there is (currently) no input."

We dont want the user to loop internally in the next because then we have
almost the same problem as now with the run(). We want to do snapshots
between 2 produced source elements, roughly the same time at all the
sources so we cannot afford waiting for some random user behaviour to
finish.


On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Did you consider the Storm way to handle this?
>
> Storm offers a method "void next()" that uses a collector object to emit
> new tuples. Using this interface, "next()" can loop internally as long
> as tuples are available and return if there is (currently) no input.
> What I have seen, people tend to emit a single tuple an leave next()
> immediately, because Storm call next() in an infinite loop anyway.
> -> You could force the UDF to return each time, be disallowing
> consecutive calls to Collector.out(...).
>
> If next() is called by the system and it returns, it is easy to check if
> the out(..) method of the collector object was called at least once. If
> the recored was emitted, Storm "sleeps" for a while before calling
> next() again, to avoid busy waiting. The sleeping time is increased for
> consecutive "empty" next() calls and reset the first time next() emits
> records again.
>
> I like this interface, because it's very simple and would prefer it over
> an interface with many methods.
>
>
> -Matthias
>
>
> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> > Hi,
> > in the process of reworking the Streaming Operator model I'm also
> > reworking the sources in order to get rid of the loop in each source.
> > Right now, the interface for sources (SourceFunction) has one method:
> > run(). This is called when the source starts and can just output
> > elements at any time using the Collector interface. This does not give
> > the Task that runs the source a lot of control in suspending operation
> > for performing checkpoints or some such thing.
> >
> > I thought about changing the interface to this:
> >
> > interface SourceFunction<T>  {
> >   boolean reachedEnd();
> >   T next();
> > }
> >
> > This is similar to the batch API and also to what Stephan proposes in
> > his pull request. I think this will not work for streaming because
> > sources might not have new elements to emit at the moment but might
> > have something to emit in the future. This is problematic because
> > streaming topologies are usually running indefinitely. In that case,
> > the reachedEnd() and next() would have to be blocking (until a new
> > element arrives). This again does not give the task the power to
> > suspend operation at will.
> >
> > I propose a three function interface:
> >
> > interface SourceFunction<T> {
> >   boolean reachedEnd():
> >   boolean hasNext():
> >   T next();
> > }
> >
> > where the contract for the source is as follows:
> >  - reachedEnd() == true => stop the source
> >  - hasNext() == true => call next() to retrieve next element
> >  - hasNext() == false => call again at some later point
> >  - next() => retrieve next element, throw exception if no element
> available
> >
> > I thought about allowing next() to return NULL to signal that no
> > element is available at the moment. This will not work because a
> > source might want to return NULL as an element.
> >
> > What do you think? Any other ideas about solving this?
> >
> > Cheers,
> > Aljoscha
> >
>
>

Re: [DISCUSS] Behaviour of Streaming Sources

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
Did you consider the Storm way to handle this?

Storm offers a method "void next()" that uses a collector object to emit
new tuples. Using this interface, "next()" can loop internally as long
as tuples are available and return if there is (currently) no input.
What I have seen, people tend to emit a single tuple an leave next()
immediately, because Storm call next() in an infinite loop anyway.
-> You could force the UDF to return each time, be disallowing
consecutive calls to Collector.out(...).

If next() is called by the system and it returns, it is easy to check if
the out(..) method of the collector object was called at least once. If
the recored was emitted, Storm "sleeps" for a while before calling
next() again, to avoid busy waiting. The sleeping time is increased for
consecutive "empty" next() calls and reset the first time next() emits
records again.

I like this interface, because it's very simple and would prefer it over
an interface with many methods.


-Matthias


On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> Hi,
> in the process of reworking the Streaming Operator model I'm also
> reworking the sources in order to get rid of the loop in each source.
> Right now, the interface for sources (SourceFunction) has one method:
> run(). This is called when the source starts and can just output
> elements at any time using the Collector interface. This does not give
> the Task that runs the source a lot of control in suspending operation
> for performing checkpoints or some such thing.
> 
> I thought about changing the interface to this:
> 
> interface SourceFunction<T>  {
>   boolean reachedEnd();
>   T next();
> }
> 
> This is similar to the batch API and also to what Stephan proposes in
> his pull request. I think this will not work for streaming because
> sources might not have new elements to emit at the moment but might
> have something to emit in the future. This is problematic because
> streaming topologies are usually running indefinitely. In that case,
> the reachedEnd() and next() would have to be blocking (until a new
> element arrives). This again does not give the task the power to
> suspend operation at will.
> 
> I propose a three function interface:
> 
> interface SourceFunction<T> {
>   boolean reachedEnd():
>   boolean hasNext():
>   T next();
> }
> 
> where the contract for the source is as follows:
>  - reachedEnd() == true => stop the source
>  - hasNext() == true => call next() to retrieve next element
>  - hasNext() == false => call again at some later point
>  - next() => retrieve next element, throw exception if no element available
> 
> I thought about allowing next() to return NULL to signal that no
> element is available at the moment. This will not work because a
> source might want to return NULL as an element.
> 
> What do you think? Any other ideas about solving this?
> 
> Cheers,
> Aljoscha
>