You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Gyula Fóra <gy...@apache.org> on 2015/07/07 15:57:35 UTC

Rework of streaming iteration API

Hey,

Along with the suggested changes to the streaming API structure I think we
should also rework the "iteration" api. Currently the iteration api tries
to mimic the syntax of the batch API while the runtime behaviour is quite
different.

What we create instead of iterations is really just cyclic streams (loops
in the streaming job), so the API should somehow be intuitive about this
behaviour.

I suggest to remove the explicit iterate call and instead add a method to
the StreamOperators that allows to connect feedback inputs (create loops).
It would look like this:

A mapper that does nothing but iterates over some filtered input:

*Current API :*
DataStream source = ..
IterativeDataStream it = source.iterate()
DataStream mapper = it.map(noOpMapper)
DataStream feedback = mapper.filter(...)
it.closeWith(feedback)

*Suggested API :*
DataStream source = ..
DataStream mapper = source.map(noOpMapper)
DataStream feedback = mapper.filter(...)
mapper.addInput(feedback)

The suggested approach would let us define inputs to operators after they
are created and implicitly union them with the normal input. This is I
think a much clearer approach than what we have now.

What do you think?

Gyula

Re: Rework of streaming iteration API

Posted by Kostas Tzoumas <kt...@apache.org>.
+1 for rethinking the iterations in DataStream

However, wouldn't this proposal allow the definition of arbitrary loops
(e.g., nested loops) that are not well behaved afaik?

On Tue, Jul 7, 2015 at 4:12 PM, Stephan Ewen <se...@apache.org> wrote:

> I see that the newly proposed API makes some things easier to define.
>
> There is one source of confusion, though, in my opinion:
>
> The new API suggests that the data stream actually refers to the operator
> that created it.
> The "DataStream mapper = source.map(noOpMapper)" here refers to the map
> operator, not to the result of the map function.
>
> When adding the feedback input, you add the input to the stream before the
> stream that you call "addInput()" on. Here, the feedback  actually gets
> unioned with the source data stream, not with the result of the mapper.
> This seems very weird to me.
>
> What happens here:
>
> DataStream source = env.createStream(myKafkaConnector);
> DataStream mapper = source.map(noOpMapper)
> source.addInput(feedback)
>
> or here:
>
> DataStream source1 = env.createStream(myKafkaConnector);
> DataStream source2 = env.createStream(myKafkaConnector);
>
> DataStream joined =
> source1.keyBy(...).join(source2.keyBy(...)).onWindow(...);
> DataStream feedback = joined.map(someMapper);
> joined.addInput(feedback);
>
>
>
>
> On Tue, Jul 7, 2015 at 3:57 PM, Gyula Fóra <gy...@apache.org> wrote:
>
> > Hey,
> >
> > Along with the suggested changes to the streaming API structure I think
> we
> > should also rework the "iteration" api. Currently the iteration api tries
> > to mimic the syntax of the batch API while the runtime behaviour is quite
> > different.
> >
> > What we create instead of iterations is really just cyclic streams (loops
> > in the streaming job), so the API should somehow be intuitive about this
> > behaviour.
> >
> > I suggest to remove the explicit iterate call and instead add a method to
> > the StreamOperators that allows to connect feedback inputs (create
> loops).
> > It would look like this:
> >
> > A mapper that does nothing but iterates over some filtered input:
> >
> > *Current API :*
> > DataStream source = ..
> > IterativeDataStream it = source.iterate()
> > DataStream mapper = it.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > it.closeWith(feedback)
> >
> > *Suggested API :*
> > DataStream source = ..
> > DataStream mapper = source.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > mapper.addInput(feedback)
> >
> > The suggested approach would let us define inputs to operators after they
> > are created and implicitly union them with the normal input. This is I
> > think a much clearer approach than what we have now.
> >
> > What do you think?
> >
> > Gyula
> >
>

Re: Rework of streaming iteration API

Posted by Stephan Ewen <se...@apache.org>.
I see that the newly proposed API makes some things easier to define.

There is one source of confusion, though, in my opinion:

The new API suggests that the data stream actually refers to the operator
that created it.
The "DataStream mapper = source.map(noOpMapper)" here refers to the map
operator, not to the result of the map function.

When adding the feedback input, you add the input to the stream before the
stream that you call "addInput()" on. Here, the feedback  actually gets
unioned with the source data stream, not with the result of the mapper.
This seems very weird to me.

What happens here:

DataStream source = env.createStream(myKafkaConnector);
DataStream mapper = source.map(noOpMapper)
source.addInput(feedback)

or here:

DataStream source1 = env.createStream(myKafkaConnector);
DataStream source2 = env.createStream(myKafkaConnector);

DataStream joined =
source1.keyBy(...).join(source2.keyBy(...)).onWindow(...);
DataStream feedback = joined.map(someMapper);
joined.addInput(feedback);




On Tue, Jul 7, 2015 at 3:57 PM, Gyula Fóra <gy...@apache.org> wrote:

> Hey,
>
> Along with the suggested changes to the streaming API structure I think we
> should also rework the "iteration" api. Currently the iteration api tries
> to mimic the syntax of the batch API while the runtime behaviour is quite
> different.
>
> What we create instead of iterations is really just cyclic streams (loops
> in the streaming job), so the API should somehow be intuitive about this
> behaviour.
>
> I suggest to remove the explicit iterate call and instead add a method to
> the StreamOperators that allows to connect feedback inputs (create loops).
> It would look like this:
>
> A mapper that does nothing but iterates over some filtered input:
>
> *Current API :*
> DataStream source = ..
> IterativeDataStream it = source.iterate()
> DataStream mapper = it.map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> it.closeWith(feedback)
>
> *Suggested API :*
> DataStream source = ..
> DataStream mapper = source.map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> mapper.addInput(feedback)
>
> The suggested approach would let us define inputs to operators after they
> are created and implicitly union them with the normal input. This is I
> think a much clearer approach than what we have now.
>
> What do you think?
>
> Gyula
>

Re: Rework of streaming iteration API

Posted by Gyula Fóra <gy...@gmail.com>.
@Kostas:
This new API is I believe equivalent in expressivity with the current one.
We can define nested loops now as well.
And I also don't see nested loops much worse generally than simple loops.

Gyula Fóra <gy...@gmail.com> ezt írta (időpont: 2015. júl. 7., K,
16:14):

> Sorry Stephan I meant it slightly differently, I see your point:
>
> DataStream source = ...
> SingleInputOperator mapper = source.map(...)
> mapper.addInput()
>
> So the add input would be a method of the operator not the stream.
>
> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. júl. 7.,
> K, 16:12):
>
>> I think this would be good yes. I was just about to open an Issue for
>> changing the Streaming Iteration API. :D
>>
>> Then we should also make the implementation very straightforward and
>> simple, right now, the implementation of the iterations is all over the
>> place.
>>
>> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
>>
>> > Hey,
>> >
>> > Along with the suggested changes to the streaming API structure I think
>> we
>> > should also rework the "iteration" api. Currently the iteration api
>> tries
>> > to mimic the syntax of the batch API while the runtime behaviour is
>> quite
>> > different.
>> >
>> > What we create instead of iterations is really just cyclic streams
>> (loops
>> > in the streaming job), so the API should somehow be intuitive about this
>> > behaviour.
>> >
>> > I suggest to remove the explicit iterate call and instead add a method
>> to
>> > the StreamOperators that allows to connect feedback inputs (create
>> loops).
>> > It would look like this:
>> >
>> > A mapper that does nothing but iterates over some filtered input:
>> >
>> > *Current API :*
>> > DataStream source = ..
>> > IterativeDataStream it = source.iterate()
>> > DataStream mapper = it.map(noOpMapper)
>> > DataStream feedback = mapper.filter(...)
>> > it.closeWith(feedback)
>> >
>> > *Suggested API :*
>> > DataStream source = ..
>> > DataStream mapper = source.map(noOpMapper)
>> > DataStream feedback = mapper.filter(...)
>> > mapper.addInput(feedback)
>> >
>> > The suggested approach would let us define inputs to operators after
>> they
>> > are created and implicitly union them with the normal input. This is I
>> > think a much clearer approach than what we have now.
>> >
>> > What do you think?
>> >
>> > Gyula
>> >
>>
>

Re: Rework of streaming iteration API

Posted by Gyula Fóra <gy...@gmail.com>.
Sorry Stephan I meant it slightly differently, I see your point:

DataStream source = ...
SingleInputOperator mapper = source.map(...)
mapper.addInput()

So the add input would be a method of the operator not the stream.

Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. júl. 7., K,
16:12):

> I think this would be good yes. I was just about to open an Issue for
> changing the Streaming Iteration API. :D
>
> Then we should also make the implementation very straightforward and
> simple, right now, the implementation of the iterations is all over the
> place.
>
> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
>
> > Hey,
> >
> > Along with the suggested changes to the streaming API structure I think
> we
> > should also rework the "iteration" api. Currently the iteration api tries
> > to mimic the syntax of the batch API while the runtime behaviour is quite
> > different.
> >
> > What we create instead of iterations is really just cyclic streams (loops
> > in the streaming job), so the API should somehow be intuitive about this
> > behaviour.
> >
> > I suggest to remove the explicit iterate call and instead add a method to
> > the StreamOperators that allows to connect feedback inputs (create
> loops).
> > It would look like this:
> >
> > A mapper that does nothing but iterates over some filtered input:
> >
> > *Current API :*
> > DataStream source = ..
> > IterativeDataStream it = source.iterate()
> > DataStream mapper = it.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > it.closeWith(feedback)
> >
> > *Suggested API :*
> > DataStream source = ..
> > DataStream mapper = source.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > mapper.addInput(feedback)
> >
> > The suggested approach would let us define inputs to operators after they
> > are created and implicitly union them with the normal input. This is I
> > think a much clearer approach than what we have now.
> >
> > What do you think?
> >
> > Gyula
> >
>

Re: Rework of streaming iteration API

Posted by Paris Carbone <pa...@kth.se>.
Good points. If we want to structured loops on streaming we will need to inject iteration counters. The question is if we really need structured iterations on plain data streams. Window iterations are must-have on the other hand...

Paris

> On 07 Jul 2015, at 16:43, Kostas Tzoumas <kt...@apache.org> wrote:
> 
> I see. Perhaps more important IMO is defining the semantics of stream loops
> with event time.
> 
> The reason I asked about nested is that Naiad and other designs used a
> multidimensional timestamp to capture loops: (outer loop counter, inner
> loop counter, timestamp). I assume that currently making sense of which
> iteration an element comes from is left to the user. Should we aim to
> change that with the API redesign?
> 
> 
> On Tue, Jul 7, 2015 at 4:30 PM, Gyula Fóra <gy...@gmail.com> wrote:
> 
>> Okay, I am fine with this approach as well I see the advantages. Then we
>> just need to find a suitable name for marking a "FeedbackPoint" :)
>> 
>> Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. júl. 7., K,
>> 16:28):
>> 
>>> In Aljoscha's approach, we would need a special mutable stream. We could
>> do
>>> it like this:
>>> 
>>> DataStream source = ...
>>> 
>>> FeedbackPoint pt = source.createFeedbackPoint();
>>> 
>>> DataStream mapper = pt .map(noOpMapper)
>>> DataStream feedback = mapper.filter(...)
>>> pt .addFeedbacl(feedback)
>>> 
>>> 
>>> It is basically like the current approach, with different names.
>>> 
>>> I actually like the current approach, because it is explicit where
>> streams
>>> could be altered in hind-sight (after their definition).
>>> 
>>> 
>>> On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <gy...@gmail.com> wrote:
>>> 
>>>> @Aljoscha:
>>>> Yes, thats basically my point as well. This is what happens now too but
>>> we
>>>> give this mutable datastream a special name : IterativeDataStream
>>>> 
>>>> This can be handled in very different ways through the api, the goal
>>> would
>>>> be to make something easy to use. I am fine with what we have now
>>> because I
>>>> know how it works but it might confuse people to call it iterate.
>>>> 
>>>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. júl.
>> 7.,
>>>> K,
>>>> 16:18):
>>>> 
>>>>> I think it could work if we allowed a DataStream to be unioned after
>>>>> creation. For example:
>>>>> 
>>>>> DataStream source = ..
>>>>> DataStream mapper = source.map(noOpMapper)
>>>>> DataStream feedback = mapper.filter(...)
>>>>> source.union(feedback)
>>>>> 
>>>>> This would basically mean that a DataStream is mutable and can be
>>>> extended
>>>>> after creation with more streams.
>>>>> 
>>>>> On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>> 
>>>>>> I think this would be good yes. I was just about to open an Issue
>> for
>>>>>> changing the Streaming Iteration API. :D
>>>>>> 
>>>>>> Then we should also make the implementation very straightforward
>> and
>>>>>> simple, right now, the implementation of the iterations is all over
>>> the
>>>>>> place.
>>>>>> 
>>>>>> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
>>>>>> 
>>>>>>> Hey,
>>>>>>> 
>>>>>>> Along with the suggested changes to the streaming API structure I
>>>> think
>>>>> we
>>>>>>> should also rework the "iteration" api. Currently the iteration
>> api
>>>>> tries
>>>>>>> to mimic the syntax of the batch API while the runtime behaviour
>> is
>>>>> quite
>>>>>>> different.
>>>>>>> 
>>>>>>> What we create instead of iterations is really just cyclic streams
>>>>> (loops
>>>>>>> in the streaming job), so the API should somehow be intuitive
>> about
>>>> this
>>>>>>> behaviour.
>>>>>>> 
>>>>>>> I suggest to remove the explicit iterate call and instead add a
>>> method
>>>>> to
>>>>>>> the StreamOperators that allows to connect feedback inputs (create
>>>>> loops).
>>>>>>> It would look like this:
>>>>>>> 
>>>>>>> A mapper that does nothing but iterates over some filtered input:
>>>>>>> 
>>>>>>> *Current API :*
>>>>>>> DataStream source = ..
>>>>>>> IterativeDataStream it = source.iterate()
>>>>>>> DataStream mapper = it.map(noOpMapper)
>>>>>>> DataStream feedback = mapper.filter(...)
>>>>>>> it.closeWith(feedback)
>>>>>>> 
>>>>>>> *Suggested API :*
>>>>>>> DataStream source = ..
>>>>>>> DataStream mapper = source.map(noOpMapper)
>>>>>>> DataStream feedback = mapper.filter(...)
>>>>>>> mapper.addInput(feedback)
>>>>>>> 
>>>>>>> The suggested approach would let us define inputs to operators
>> after
>>>>> they
>>>>>>> are created and implicitly union them with the normal input. This
>>> is I
>>>>>>> think a much clearer approach than what we have now.
>>>>>>> 
>>>>>>> What do you think?
>>>>>>> 
>>>>>>> Gyula
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: Rework of streaming iteration API

Posted by Kostas Tzoumas <kt...@apache.org>.
I see. Perhaps more important IMO is defining the semantics of stream loops
with event time.

The reason I asked about nested is that Naiad and other designs used a
multidimensional timestamp to capture loops: (outer loop counter, inner
loop counter, timestamp). I assume that currently making sense of which
iteration an element comes from is left to the user. Should we aim to
change that with the API redesign?


On Tue, Jul 7, 2015 at 4:30 PM, Gyula Fóra <gy...@gmail.com> wrote:

> Okay, I am fine with this approach as well I see the advantages. Then we
> just need to find a suitable name for marking a "FeedbackPoint" :)
>
> Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. júl. 7., K,
> 16:28):
>
> > In Aljoscha's approach, we would need a special mutable stream. We could
> do
> > it like this:
> >
> > DataStream source = ...
> >
> > FeedbackPoint pt = source.createFeedbackPoint();
> >
> > DataStream mapper = pt .map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > pt .addFeedbacl(feedback)
> >
> >
> > It is basically like the current approach, with different names.
> >
> > I actually like the current approach, because it is explicit where
> streams
> > could be altered in hind-sight (after their definition).
> >
> >
> > On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <gy...@gmail.com> wrote:
> >
> > > @Aljoscha:
> > > Yes, thats basically my point as well. This is what happens now too but
> > we
> > > give this mutable datastream a special name : IterativeDataStream
> > >
> > > This can be handled in very different ways through the api, the goal
> > would
> > > be to make something easy to use. I am fine with what we have now
> > because I
> > > know how it works but it might confuse people to call it iterate.
> > >
> > > Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. júl.
> 7.,
> > > K,
> > > 16:18):
> > >
> > > > I think it could work if we allowed a DataStream to be unioned after
> > > > creation. For example:
> > > >
> > > > DataStream source = ..
> > > > DataStream mapper = source.map(noOpMapper)
> > > > DataStream feedback = mapper.filter(...)
> > > > source.union(feedback)
> > > >
> > > > This would basically mean that a DataStream is mutable and can be
> > > extended
> > > > after creation with more streams.
> > > >
> > > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <al...@apache.org>
> > > wrote:
> > > >
> > > > > I think this would be good yes. I was just about to open an Issue
> for
> > > > > changing the Streaming Iteration API. :D
> > > > >
> > > > > Then we should also make the implementation very straightforward
> and
> > > > > simple, right now, the implementation of the iterations is all over
> > the
> > > > > place.
> > > > >
> > > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
> > > > >
> > > > >> Hey,
> > > > >>
> > > > >> Along with the suggested changes to the streaming API structure I
> > > think
> > > > we
> > > > >> should also rework the "iteration" api. Currently the iteration
> api
> > > > tries
> > > > >> to mimic the syntax of the batch API while the runtime behaviour
> is
> > > > quite
> > > > >> different.
> > > > >>
> > > > >> What we create instead of iterations is really just cyclic streams
> > > > (loops
> > > > >> in the streaming job), so the API should somehow be intuitive
> about
> > > this
> > > > >> behaviour.
> > > > >>
> > > > >> I suggest to remove the explicit iterate call and instead add a
> > method
> > > > to
> > > > >> the StreamOperators that allows to connect feedback inputs (create
> > > > loops).
> > > > >> It would look like this:
> > > > >>
> > > > >> A mapper that does nothing but iterates over some filtered input:
> > > > >>
> > > > >> *Current API :*
> > > > >> DataStream source = ..
> > > > >> IterativeDataStream it = source.iterate()
> > > > >> DataStream mapper = it.map(noOpMapper)
> > > > >> DataStream feedback = mapper.filter(...)
> > > > >> it.closeWith(feedback)
> > > > >>
> > > > >> *Suggested API :*
> > > > >> DataStream source = ..
> > > > >> DataStream mapper = source.map(noOpMapper)
> > > > >> DataStream feedback = mapper.filter(...)
> > > > >> mapper.addInput(feedback)
> > > > >>
> > > > >> The suggested approach would let us define inputs to operators
> after
> > > > they
> > > > >> are created and implicitly union them with the normal input. This
> > is I
> > > > >> think a much clearer approach than what we have now.
> > > > >>
> > > > >> What do you think?
> > > > >>
> > > > >> Gyula
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Rework of streaming iteration API

Posted by Gyula Fóra <gy...@gmail.com>.
Okay, I am fine with this approach as well I see the advantages. Then we
just need to find a suitable name for marking a "FeedbackPoint" :)

Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. júl. 7., K, 16:28):

> In Aljoscha's approach, we would need a special mutable stream. We could do
> it like this:
>
> DataStream source = ...
>
> FeedbackPoint pt = source.createFeedbackPoint();
>
> DataStream mapper = pt .map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> pt .addFeedbacl(feedback)
>
>
> It is basically like the current approach, with different names.
>
> I actually like the current approach, because it is explicit where streams
> could be altered in hind-sight (after their definition).
>
>
> On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <gy...@gmail.com> wrote:
>
> > @Aljoscha:
> > Yes, thats basically my point as well. This is what happens now too but
> we
> > give this mutable datastream a special name : IterativeDataStream
> >
> > This can be handled in very different ways through the api, the goal
> would
> > be to make something easy to use. I am fine with what we have now
> because I
> > know how it works but it might confuse people to call it iterate.
> >
> > Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. júl. 7.,
> > K,
> > 16:18):
> >
> > > I think it could work if we allowed a DataStream to be unioned after
> > > creation. For example:
> > >
> > > DataStream source = ..
> > > DataStream mapper = source.map(noOpMapper)
> > > DataStream feedback = mapper.filter(...)
> > > source.union(feedback)
> > >
> > > This would basically mean that a DataStream is mutable and can be
> > extended
> > > after creation with more streams.
> > >
> > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <al...@apache.org>
> > wrote:
> > >
> > > > I think this would be good yes. I was just about to open an Issue for
> > > > changing the Streaming Iteration API. :D
> > > >
> > > > Then we should also make the implementation very straightforward and
> > > > simple, right now, the implementation of the iterations is all over
> the
> > > > place.
> > > >
> > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
> > > >
> > > >> Hey,
> > > >>
> > > >> Along with the suggested changes to the streaming API structure I
> > think
> > > we
> > > >> should also rework the "iteration" api. Currently the iteration api
> > > tries
> > > >> to mimic the syntax of the batch API while the runtime behaviour is
> > > quite
> > > >> different.
> > > >>
> > > >> What we create instead of iterations is really just cyclic streams
> > > (loops
> > > >> in the streaming job), so the API should somehow be intuitive about
> > this
> > > >> behaviour.
> > > >>
> > > >> I suggest to remove the explicit iterate call and instead add a
> method
> > > to
> > > >> the StreamOperators that allows to connect feedback inputs (create
> > > loops).
> > > >> It would look like this:
> > > >>
> > > >> A mapper that does nothing but iterates over some filtered input:
> > > >>
> > > >> *Current API :*
> > > >> DataStream source = ..
> > > >> IterativeDataStream it = source.iterate()
> > > >> DataStream mapper = it.map(noOpMapper)
> > > >> DataStream feedback = mapper.filter(...)
> > > >> it.closeWith(feedback)
> > > >>
> > > >> *Suggested API :*
> > > >> DataStream source = ..
> > > >> DataStream mapper = source.map(noOpMapper)
> > > >> DataStream feedback = mapper.filter(...)
> > > >> mapper.addInput(feedback)
> > > >>
> > > >> The suggested approach would let us define inputs to operators after
> > > they
> > > >> are created and implicitly union them with the normal input. This
> is I
> > > >> think a much clearer approach than what we have now.
> > > >>
> > > >> What do you think?
> > > >>
> > > >> Gyula
> > > >>
> > > >
> > >
> >
>

Re: Rework of streaming iteration API

Posted by Stephan Ewen <se...@apache.org>.
In Aljoscha's approach, we would need a special mutable stream. We could do
it like this:

DataStream source = ...

FeedbackPoint pt = source.createFeedbackPoint();

DataStream mapper = pt .map(noOpMapper)
DataStream feedback = mapper.filter(...)
pt .addFeedbacl(feedback)


It is basically like the current approach, with different names.

I actually like the current approach, because it is explicit where streams
could be altered in hind-sight (after their definition).


On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <gy...@gmail.com> wrote:

> @Aljoscha:
> Yes, thats basically my point as well. This is what happens now too but we
> give this mutable datastream a special name : IterativeDataStream
>
> This can be handled in very different ways through the api, the goal would
> be to make something easy to use. I am fine with what we have now because I
> know how it works but it might confuse people to call it iterate.
>
> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. júl. 7.,
> K,
> 16:18):
>
> > I think it could work if we allowed a DataStream to be unioned after
> > creation. For example:
> >
> > DataStream source = ..
> > DataStream mapper = source.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > source.union(feedback)
> >
> > This would basically mean that a DataStream is mutable and can be
> extended
> > after creation with more streams.
> >
> > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <al...@apache.org>
> wrote:
> >
> > > I think this would be good yes. I was just about to open an Issue for
> > > changing the Streaming Iteration API. :D
> > >
> > > Then we should also make the implementation very straightforward and
> > > simple, right now, the implementation of the iterations is all over the
> > > place.
> > >
> > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
> > >
> > >> Hey,
> > >>
> > >> Along with the suggested changes to the streaming API structure I
> think
> > we
> > >> should also rework the "iteration" api. Currently the iteration api
> > tries
> > >> to mimic the syntax of the batch API while the runtime behaviour is
> > quite
> > >> different.
> > >>
> > >> What we create instead of iterations is really just cyclic streams
> > (loops
> > >> in the streaming job), so the API should somehow be intuitive about
> this
> > >> behaviour.
> > >>
> > >> I suggest to remove the explicit iterate call and instead add a method
> > to
> > >> the StreamOperators that allows to connect feedback inputs (create
> > loops).
> > >> It would look like this:
> > >>
> > >> A mapper that does nothing but iterates over some filtered input:
> > >>
> > >> *Current API :*
> > >> DataStream source = ..
> > >> IterativeDataStream it = source.iterate()
> > >> DataStream mapper = it.map(noOpMapper)
> > >> DataStream feedback = mapper.filter(...)
> > >> it.closeWith(feedback)
> > >>
> > >> *Suggested API :*
> > >> DataStream source = ..
> > >> DataStream mapper = source.map(noOpMapper)
> > >> DataStream feedback = mapper.filter(...)
> > >> mapper.addInput(feedback)
> > >>
> > >> The suggested approach would let us define inputs to operators after
> > they
> > >> are created and implicitly union them with the normal input. This is I
> > >> think a much clearer approach than what we have now.
> > >>
> > >> What do you think?
> > >>
> > >> Gyula
> > >>
> > >
> >
>

Re: Rework of streaming iteration API

Posted by Gyula Fóra <gy...@gmail.com>.
@Aljoscha:
Yes, thats basically my point as well. This is what happens now too but we
give this mutable datastream a special name : IterativeDataStream

This can be handled in very different ways through the api, the goal would
be to make something easy to use. I am fine with what we have now because I
know how it works but it might confuse people to call it iterate.

Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. júl. 7., K,
16:18):

> I think it could work if we allowed a DataStream to be unioned after
> creation. For example:
>
> DataStream source = ..
> DataStream mapper = source.map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> source.union(feedback)
>
> This would basically mean that a DataStream is mutable and can be extended
> after creation with more streams.
>
> On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <al...@apache.org> wrote:
>
> > I think this would be good yes. I was just about to open an Issue for
> > changing the Streaming Iteration API. :D
> >
> > Then we should also make the implementation very straightforward and
> > simple, right now, the implementation of the iterations is all over the
> > place.
> >
> > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
> >
> >> Hey,
> >>
> >> Along with the suggested changes to the streaming API structure I think
> we
> >> should also rework the "iteration" api. Currently the iteration api
> tries
> >> to mimic the syntax of the batch API while the runtime behaviour is
> quite
> >> different.
> >>
> >> What we create instead of iterations is really just cyclic streams
> (loops
> >> in the streaming job), so the API should somehow be intuitive about this
> >> behaviour.
> >>
> >> I suggest to remove the explicit iterate call and instead add a method
> to
> >> the StreamOperators that allows to connect feedback inputs (create
> loops).
> >> It would look like this:
> >>
> >> A mapper that does nothing but iterates over some filtered input:
> >>
> >> *Current API :*
> >> DataStream source = ..
> >> IterativeDataStream it = source.iterate()
> >> DataStream mapper = it.map(noOpMapper)
> >> DataStream feedback = mapper.filter(...)
> >> it.closeWith(feedback)
> >>
> >> *Suggested API :*
> >> DataStream source = ..
> >> DataStream mapper = source.map(noOpMapper)
> >> DataStream feedback = mapper.filter(...)
> >> mapper.addInput(feedback)
> >>
> >> The suggested approach would let us define inputs to operators after
> they
> >> are created and implicitly union them with the normal input. This is I
> >> think a much clearer approach than what we have now.
> >>
> >> What do you think?
> >>
> >> Gyula
> >>
> >
>

Re: Rework of streaming iteration API

Posted by Aljoscha Krettek <al...@apache.org>.
I think it could work if we allowed a DataStream to be unioned after
creation. For example:

DataStream source = ..
DataStream mapper = source.map(noOpMapper)
DataStream feedback = mapper.filter(...)
source.union(feedback)

This would basically mean that a DataStream is mutable and can be extended
after creation with more streams.

On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <al...@apache.org> wrote:

> I think this would be good yes. I was just about to open an Issue for
> changing the Streaming Iteration API. :D
>
> Then we should also make the implementation very straightforward and
> simple, right now, the implementation of the iterations is all over the
> place.
>
> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:
>
>> Hey,
>>
>> Along with the suggested changes to the streaming API structure I think we
>> should also rework the "iteration" api. Currently the iteration api tries
>> to mimic the syntax of the batch API while the runtime behaviour is quite
>> different.
>>
>> What we create instead of iterations is really just cyclic streams (loops
>> in the streaming job), so the API should somehow be intuitive about this
>> behaviour.
>>
>> I suggest to remove the explicit iterate call and instead add a method to
>> the StreamOperators that allows to connect feedback inputs (create loops).
>> It would look like this:
>>
>> A mapper that does nothing but iterates over some filtered input:
>>
>> *Current API :*
>> DataStream source = ..
>> IterativeDataStream it = source.iterate()
>> DataStream mapper = it.map(noOpMapper)
>> DataStream feedback = mapper.filter(...)
>> it.closeWith(feedback)
>>
>> *Suggested API :*
>> DataStream source = ..
>> DataStream mapper = source.map(noOpMapper)
>> DataStream feedback = mapper.filter(...)
>> mapper.addInput(feedback)
>>
>> The suggested approach would let us define inputs to operators after they
>> are created and implicitly union them with the normal input. This is I
>> think a much clearer approach than what we have now.
>>
>> What do you think?
>>
>> Gyula
>>
>

Re: Rework of streaming iteration API

Posted by Aljoscha Krettek <al...@apache.org>.
I think this would be good yes. I was just about to open an Issue for
changing the Streaming Iteration API. :D

Then we should also make the implementation very straightforward and
simple, right now, the implementation of the iterations is all over the
place.

On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gy...@apache.org> wrote:

> Hey,
>
> Along with the suggested changes to the streaming API structure I think we
> should also rework the "iteration" api. Currently the iteration api tries
> to mimic the syntax of the batch API while the runtime behaviour is quite
> different.
>
> What we create instead of iterations is really just cyclic streams (loops
> in the streaming job), so the API should somehow be intuitive about this
> behaviour.
>
> I suggest to remove the explicit iterate call and instead add a method to
> the StreamOperators that allows to connect feedback inputs (create loops).
> It would look like this:
>
> A mapper that does nothing but iterates over some filtered input:
>
> *Current API :*
> DataStream source = ..
> IterativeDataStream it = source.iterate()
> DataStream mapper = it.map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> it.closeWith(feedback)
>
> *Suggested API :*
> DataStream source = ..
> DataStream mapper = source.map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> mapper.addInput(feedback)
>
> The suggested approach would let us define inputs to operators after they
> are created and implicitly union them with the normal input. This is I
> think a much clearer approach than what we have now.
>
> What do you think?
>
> Gyula
>