You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Gyula Fóra <gy...@apache.org> on 2015/05/20 10:50:00 UTC

[DISCUSS] Re-add record copy to chained operator calls

Hey,

The latest streaming operator rework removed the copying of the outputs
before passing them to chained operators. This is a major break for the
previous operator semantics which guaranteed immutability.

I think this change leads to very indeterministic program behaviour from
the user's perspective as only non-chained outputs/inputs will be mutable.
If we allow this to happen, users will start disabling chaining to get
immutability which defeats the purpose. (chaining should not affect program
behaviour just increase performance)

In my opinion the default setting for each operator should be immutability
and the user could override this manually if he/she wants.

What do you think?

Regards,
Gyula

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Stephan Ewen <se...@apache.org>.
That's fine, you convinced me ;-)

And given a flag to deactivate it, I think it should be okay for everyone.

Once we have proper serialized window buffers, the number of copies should
go down quite a bit anyways...

On Wed, May 20, 2015 at 4:29 PM, Gyula Fóra <gy...@apache.org> wrote:

> This is not about me, please don't get me wrong :)
> It would be good if other people would tell their opinions as well.
>
> I am just trying to make the point that other systems do this as well for a
> reason. Users are used to this abstraction.
>
> On Wed, May 20, 2015 at 4:18 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > I think it is fair to say that everything that Flink has in its core
> > provides immutability. The mutability effect comes only if the user
> starts
> > mutating objects across functions.
> >
> > The overhead will depend vastly on whether you are sending smaller
> records
> > or large records.
> >
> > I see you are very keen on the failsafe variant. That is fine, I'd say
> > let's go ahead.
> >
> > Then let us introduce a switch. The switch needs to work on copies for
> user
> > functions only. Until the window buffers are serialized, we need to keep
> > the copies there.
> >
> >
> >
> > On Wed, May 20, 2015 at 3:55 PM, Gyula Fóra <gy...@gmail.com>
> wrote:
> >
> > > I know it is nicer to have no-copy from a performance perspective, but
> a
> > > dataflow system with no immutability guarantee is something very hard
> to
> > > describe.
> > >
> > > Systems like Storm and Google Dataflow have immutablility guarantees I
> > > think for the same reason to provide very clear, easy to use semantics.
> > >
> > > On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > A vote is the last resort. Consensus through discussion is much
> nicer.
> > > And
> > > > I think we are making progress.
> > > >
> > > > We went for the lightweight version in the batch API, because
> > > >  - there are few cases that are affected (only functions with side
> > effect
> > > > state)
> > > >  - you can always switch lightweight -> failsafe in the future (only
> > > > hardens guarantees), but not the other way around (dropping
> guarantees)
> > > > without breaking existing code.
> > > >
> > > > If you are strong on that point, I do not want to be a blocker for
> > this.
> > > I
> > > > only ask to make a well informed decision, as this behavior is as
> much
> > > part
> > > > of the API as the classname of the DataStream.
> > > >
> > > >
> > > > On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gy...@gmail.com>
> > > wrote:
> > > >
> > > > > I would go for the Failsafe option as a default behaviour with a
> > > clearly
> > > > > documented lightweight (no-copy) setting, but I think having a Vote
> > on
> > > > this
> > > > > would be the proper way of settling this question.
> > > > >
> > > > > On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > I think that in the long run (maybe not too long) we will have to
> > > > > > change our stateful operators (windows, basically) to use managed
> > > > > > memory and spill to disk. (Think jobs that have sliding windows
> > over
> > > > > > days or weeks) Then then the internal operators will take care of
> > > > > > copying anyways. The problem Gyula mentioned we cannot tackle
> other
> > > > > > than by defining how user code must behave.
> > > > > >
> > > > > > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org>
> > > > wrote:
> > > > > > > It does not mean we have to behave the same way, it is just an
> > > > > indication
> > > > > > > that well-defined behavior can allow you to mess things up.
> > > > > > >
> > > > > > > The question is now what is the default mode:
> > > > > > >  - Failsafe/Heavy (always copy)
> > > > > > >  - Performance/Lightweight (do not copy)
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <
> sewen@apache.org>
> > > > > wrote:
> > > > > > >
> > > > > > >> This is something that we can clearly define as "should not be
> > > > done".
> > > > > > >> Systems do that.
> > > > > > >> I think if you repeatedly emit (or mutate) the same object for
> > > > example
> > > > > > in
> > > > > > >> Spark, you get an RDD with completely messed up contents.
> > > > > > >>
> > > > > > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <
> gyfora@apache.org>
> > > > > wrote:
> > > > > > >>
> > > > > > >>> If the preceding operator is emitting a mutated object, or
> does
> > > > > > something
> > > > > > >>> with the output object afterwards then its a problem.
> > > > > > >>>
> > > > > > >>> Emitting the same object is a special case of this.
> > > > > > >>>
> > > > > > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <
> > sewen@apache.org>
> > > > > > wrote:
> > > > > > >>>
> > > > > > >>> > The case you are making is if a preceding operator in a
> chain
> > > is
> > > > > > >>> repeatedly
> > > > > > >>> > emitting the same object, and the succeeding operator is
> > > > gathering
> > > > > > the
> > > > > > >>> > objects, then it is a problem
> > > > > > >>> >
> > > > > > >>> > Or are there cases where the system itself repeatedly emits
> > the
> > > > > same
> > > > > > >>> > objects?
> > > > > > >>> >
> > > > > > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <
> > gyfora@apache.org
> > > >
> > > > > > wrote:
> > > > > > >>> >
> > > > > > >>> > > We are designing a system for stateful stream
> computations,
> > > > > > assuming
> > > > > > >>> long
> > > > > > >>> > > standing operators that gather and store data as the
> stream
> > > > > evolves
> > > > > > >>> > (unlike
> > > > > > >>> > > in the dataset api). Many programs, like windowing,
> > sampling
> > > > etc
> > > > > > hold
> > > > > > >>> the
> > > > > > >>> > > state in the form of past data. And without careful
> > > > understanding
> > > > > > of
> > > > > > >>> the
> > > > > > >>> > > runtime these programs will break or have unnecessary
> > copies.
> > > > > > >>> > >
> > > > > > >>> > > This is why I think immutability should be the default so
> > we
> > > > can
> > > > > > have
> > > > > > >>> a
> > > > > > >>> > > clear dataflow model with immutable streams.
> > > > > > >>> > >
> > > > > > >>> > > I see absolutely no reason why we cant have the non-copy
> > > > version
> > > > > > as an
> > > > > > >>> > > optional setting for the users.
> > > > > > >>> > >
> > > > > > >>> > >
> > > > > > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <
> > > parisc@kth.se>
> > > > > > wrote:
> > > > > > >>> > >
> > > > > > >>> > > > @stephan I see your point. If we assume that operators
> do
> > > not
> > > > > > hold
> > > > > > >>> > > > references in their state to any transmitted records it
> > > works
> > > > > > fine.
> > > > > > >>> We
> > > > > > >>> > > > therefore need to make this clear to the users. I need
> to
> > > > check
> > > > > > if
> > > > > > >>> that
> > > > > > >>> > > > would break semantics in SAMOA or other integrations as
> > > well
> > > > > that
> > > > > > >>> > assume
> > > > > > >>> > > > immutability. For example in SAMOA there are often
> local
> > > > metric
> > > > > > >>> objects
> > > > > > >>> > > > that are being constantly mutated and simply forwarded
> > > > > > periodically
> > > > > > >>> to
> > > > > > >>> > > > other (possibly chained) operators that need to
> evaluate
> > > > them.
> > > > > > >>> > > >
> > > > > > >>> > > > ________________________________________
> > > > > > >>> > > > From: Gyula Fóra <gy...@apache.org>
> > > > > > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> > > > > > >>> > > > To: dev@flink.apache.org
> > > > > > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained
> > > operator
> > > > > > calls
> > > > > > >>> > > >
> > > > > > >>> > > > "Copy before putting it into a window buffer and any
> > other
> > > > > group
> > > > > > >>> > buffer."
> > > > > > >>> > > >
> > > > > > >>> > > > Exactly my point. Any stateful operator should be able
> to
> > > > > > implement
> > > > > > >>> > > > something like this without having to worry about
> copying
> > > the
> > > > > > object
> > > > > > >>> > (and
> > > > > > >>> > > > at this point the user would need to know whether it
> > comes
> > > > from
> > > > > > the
> > > > > > >>> > > network
> > > > > > >>> > > > to avoid unnecessary copies), so I don't agree with
> > leaving
> > > > the
> > > > > > copy
> > > > > > >>> > off.
> > > > > > >>> > > >
> > > > > > >>> > > > The user can of course specify that the operator is
> > mutable
> > > > if
> > > > > he
> > > > > > >>> wants
> > > > > > >>> > > > (and he is worried about the performance), But I still
> > > think
> > > > > the
> > > > > > >>> > default
> > > > > > >>> > > > behaviour should be immutable.
> > > > > > >>> > > > We cannot force users to not hold object references and
> > > also
> > > > it
> > > > > > is a
> > > > > > >>> > > quite
> > > > > > >>> > > > unnatural way of programming in a language like java.
> > > > > > >>> > > >
> > > > > > >>> > > >
> > > > > > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <
> > > > > sewen@apache.org>
> > > > > > >>> > wrote:
> > > > > > >>> > > >
> > > > > > >>> > > > > I am curious why the copying is actually needed.
> > > > > > >>> > > > >
> > > > > > >>> > > > > In the batch API, we chain and do not copy and it is
> > > rather
> > > > > > >>> > > predictable.
> > > > > > >>> > > > >
> > > > > > >>> > > > > The cornerpoints of that design is to follow these
> > rules:
> > > > > > >>> > > > >
> > > > > > >>> > > > >  1) Objects read from the network or any buffer are
> > > always
> > > > > new
> > > > > > >>> > objects.
> > > > > > >>> > > > > That comes naturally when they are deserialized as
> part
> > > of
> > > > > that
> > > > > > >>> (all
> > > > > > >>> > > > > buffers store serialized)
> > > > > > >>> > > > >
> > > > > > >>> > > > >  2) After a function returned a record (or gives one
> to
> > > the
> > > > > > >>> > collector),
> > > > > > >>> > > > it
> > > > > > >>> > > > > if given to the chain of chained operators, but after
> > it
> > > is
> > > > > > >>> through
> > > > > > >>> > the
> > > > > > >>> > > > > chain, no one else holds a reference to that object.
> > > > > > >>> > > > >      For that, it is crucial that objects are not
> > stored
> > > by
> > > > > > >>> > reference,
> > > > > > >>> > > > but
> > > > > > >>> > > > > either stored serialized, or a copy is stored.
> > > > > > >>> > > > >
> > > > > > >>> > > > > This is quite solid in the batch API. How about we
> > follow
> > > > the
> > > > > > same
> > > > > > >>> > > > paradigm
> > > > > > >>> > > > > in the streaming API. We would need to adjust the
> > > > following:
> > > > > > >>> > > > >
> > > > > > >>> > > > > 1) Do not copy between operators (I think this is the
> > > case
> > > > > > right
> > > > > > >>> now)
> > > > > > >>> > > > >
> > > > > > >>> > > > > 2) Copy before putting it into a window buffer and
> any
> > > > other
> > > > > > group
> > > > > > >>> > > > buffer.
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> > > > > > >>> > aljoscha@apache.org
> > > > > > >>> > > >
> > > > > > >>> > > > > wrote:
> > > > > > >>> > > > >
> > > > > > >>> > > > > > Yes, in fact I anticipated this. There is one
> central
> > > > place
> > > > > > >>> where
> > > > > > >>> > we
> > > > > > >>> > > > > > can insert a copy step, in OperatorCollector in
> > > > > > OutputHandler.
> > > > > > >>> > > > > >
> > > > > > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <
> > > > > > parisc@kth.se>
> > > > > > >>> > > wrote:
> > > > > > >>> > > > > > > I guess it was not intended ^^.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Chaining should be transparent and not break the
> > > > > > >>> correct/expected
> > > > > > >>> > > > > > behaviour.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Paris?
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <
> > > > > > mbalassi@apache.org
> > > > > > >>> >
> > > > > > >>> > > > wrote:
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > +1 for copying.
> > > > > > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <
> > > > > gyfora@apache.org>
> > > > > > >>> > wrote:
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Hey,
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > The latest streaming operator rework removed the
> > > > copying
> > > > > of
> > > > > > >>> the
> > > > > > >>> > > > outputs
> > > > > > >>> > > > > > > before passing them to chained operators. This
> is a
> > > > major
> > > > > > >>> break
> > > > > > >>> > for
> > > > > > >>> > > > the
> > > > > > >>> > > > > > > previous operator semantics which guaranteed
> > > > > immutability.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > I think this change leads to very indeterministic
> > > > program
> > > > > > >>> > behaviour
> > > > > > >>> > > > > from
> > > > > > >>> > > > > > > the user's perspective as only non-chained
> > > > outputs/inputs
> > > > > > >>> will be
> > > > > > >>> > > > > > mutable.
> > > > > > >>> > > > > > > If we allow this to happen, users will start
> > > disabling
> > > > > > >>> chaining
> > > > > > >>> > to
> > > > > > >>> > > > get
> > > > > > >>> > > > > > > immutability which defeats the purpose. (chaining
> > > > should
> > > > > > not
> > > > > > >>> > affect
> > > > > > >>> > > > > > program
> > > > > > >>> > > > > > > behaviour just increase performance)
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > In my opinion the default setting for each
> operator
> > > > > should
> > > > > > be
> > > > > > >>> > > > > > immutability
> > > > > > >>> > > > > > > and the user could override this manually if
> he/she
> > > > > wants.
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > What do you think?
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > > Regards,
> > > > > > >>> > > > > > > Gyula
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > > >
> > > > > > >>> > > > > >
> > > > > > >>> > > > >
> > > > > > >>> > > >
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Gyula Fóra <gy...@apache.org>.
This is not about me, please don't get me wrong :)
It would be good if other people would tell their opinions as well.

I am just trying to make the point that other systems do this as well for a
reason. Users are used to this abstraction.

On Wed, May 20, 2015 at 4:18 PM, Stephan Ewen <se...@apache.org> wrote:

> I think it is fair to say that everything that Flink has in its core
> provides immutability. The mutability effect comes only if the user starts
> mutating objects across functions.
>
> The overhead will depend vastly on whether you are sending smaller records
> or large records.
>
> I see you are very keen on the failsafe variant. That is fine, I'd say
> let's go ahead.
>
> Then let us introduce a switch. The switch needs to work on copies for user
> functions only. Until the window buffers are serialized, we need to keep
> the copies there.
>
>
>
> On Wed, May 20, 2015 at 3:55 PM, Gyula Fóra <gy...@gmail.com> wrote:
>
> > I know it is nicer to have no-copy from a performance perspective, but a
> > dataflow system with no immutability guarantee is something very hard to
> > describe.
> >
> > Systems like Storm and Google Dataflow have immutablility guarantees I
> > think for the same reason to provide very clear, easy to use semantics.
> >
> > On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > A vote is the last resort. Consensus through discussion is much nicer.
> > And
> > > I think we are making progress.
> > >
> > > We went for the lightweight version in the batch API, because
> > >  - there are few cases that are affected (only functions with side
> effect
> > > state)
> > >  - you can always switch lightweight -> failsafe in the future (only
> > > hardens guarantees), but not the other way around (dropping guarantees)
> > > without breaking existing code.
> > >
> > > If you are strong on that point, I do not want to be a blocker for
> this.
> > I
> > > only ask to make a well informed decision, as this behavior is as much
> > part
> > > of the API as the classname of the DataStream.
> > >
> > >
> > > On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gy...@gmail.com>
> > wrote:
> > >
> > > > I would go for the Failsafe option as a default behaviour with a
> > clearly
> > > > documented lightweight (no-copy) setting, but I think having a Vote
> on
> > > this
> > > > would be the proper way of settling this question.
> > > >
> > > > On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > I think that in the long run (maybe not too long) we will have to
> > > > > change our stateful operators (windows, basically) to use managed
> > > > > memory and spill to disk. (Think jobs that have sliding windows
> over
> > > > > days or weeks) Then then the internal operators will take care of
> > > > > copying anyways. The problem Gyula mentioned we cannot tackle other
> > > > > than by defining how user code must behave.
> > > > >
> > > > > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > > > It does not mean we have to behave the same way, it is just an
> > > > indication
> > > > > > that well-defined behavior can allow you to mess things up.
> > > > > >
> > > > > > The question is now what is the default mode:
> > > > > >  - Failsafe/Heavy (always copy)
> > > > > >  - Performance/Lightweight (do not copy)
> > > > > >
> > > > > >
> > > > > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org>
> > > > wrote:
> > > > > >
> > > > > >> This is something that we can clearly define as "should not be
> > > done".
> > > > > >> Systems do that.
> > > > > >> I think if you repeatedly emit (or mutate) the same object for
> > > example
> > > > > in
> > > > > >> Spark, you get an RDD with completely messed up contents.
> > > > > >>
> > > > > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org>
> > > > wrote:
> > > > > >>
> > > > > >>> If the preceding operator is emitting a mutated object, or does
> > > > > something
> > > > > >>> with the output object afterwards then its a problem.
> > > > > >>>
> > > > > >>> Emitting the same object is a special case of this.
> > > > > >>>
> > > > > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <
> sewen@apache.org>
> > > > > wrote:
> > > > > >>>
> > > > > >>> > The case you are making is if a preceding operator in a chain
> > is
> > > > > >>> repeatedly
> > > > > >>> > emitting the same object, and the succeeding operator is
> > > gathering
> > > > > the
> > > > > >>> > objects, then it is a problem
> > > > > >>> >
> > > > > >>> > Or are there cases where the system itself repeatedly emits
> the
> > > > same
> > > > > >>> > objects?
> > > > > >>> >
> > > > > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <
> gyfora@apache.org
> > >
> > > > > wrote:
> > > > > >>> >
> > > > > >>> > > We are designing a system for stateful stream computations,
> > > > > assuming
> > > > > >>> long
> > > > > >>> > > standing operators that gather and store data as the stream
> > > > evolves
> > > > > >>> > (unlike
> > > > > >>> > > in the dataset api). Many programs, like windowing,
> sampling
> > > etc
> > > > > hold
> > > > > >>> the
> > > > > >>> > > state in the form of past data. And without careful
> > > understanding
> > > > > of
> > > > > >>> the
> > > > > >>> > > runtime these programs will break or have unnecessary
> copies.
> > > > > >>> > >
> > > > > >>> > > This is why I think immutability should be the default so
> we
> > > can
> > > > > have
> > > > > >>> a
> > > > > >>> > > clear dataflow model with immutable streams.
> > > > > >>> > >
> > > > > >>> > > I see absolutely no reason why we cant have the non-copy
> > > version
> > > > > as an
> > > > > >>> > > optional setting for the users.
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <
> > parisc@kth.se>
> > > > > wrote:
> > > > > >>> > >
> > > > > >>> > > > @stephan I see your point. If we assume that operators do
> > not
> > > > > hold
> > > > > >>> > > > references in their state to any transmitted records it
> > works
> > > > > fine.
> > > > > >>> We
> > > > > >>> > > > therefore need to make this clear to the users. I need to
> > > check
> > > > > if
> > > > > >>> that
> > > > > >>> > > > would break semantics in SAMOA or other integrations as
> > well
> > > > that
> > > > > >>> > assume
> > > > > >>> > > > immutability. For example in SAMOA there are often local
> > > metric
> > > > > >>> objects
> > > > > >>> > > > that are being constantly mutated and simply forwarded
> > > > > periodically
> > > > > >>> to
> > > > > >>> > > > other (possibly chained) operators that need to evaluate
> > > them.
> > > > > >>> > > >
> > > > > >>> > > > ________________________________________
> > > > > >>> > > > From: Gyula Fóra <gy...@apache.org>
> > > > > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> > > > > >>> > > > To: dev@flink.apache.org
> > > > > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained
> > operator
> > > > > calls
> > > > > >>> > > >
> > > > > >>> > > > "Copy before putting it into a window buffer and any
> other
> > > > group
> > > > > >>> > buffer."
> > > > > >>> > > >
> > > > > >>> > > > Exactly my point. Any stateful operator should be able to
> > > > > implement
> > > > > >>> > > > something like this without having to worry about copying
> > the
> > > > > object
> > > > > >>> > (and
> > > > > >>> > > > at this point the user would need to know whether it
> comes
> > > from
> > > > > the
> > > > > >>> > > network
> > > > > >>> > > > to avoid unnecessary copies), so I don't agree with
> leaving
> > > the
> > > > > copy
> > > > > >>> > off.
> > > > > >>> > > >
> > > > > >>> > > > The user can of course specify that the operator is
> mutable
> > > if
> > > > he
> > > > > >>> wants
> > > > > >>> > > > (and he is worried about the performance), But I still
> > think
> > > > the
> > > > > >>> > default
> > > > > >>> > > > behaviour should be immutable.
> > > > > >>> > > > We cannot force users to not hold object references and
> > also
> > > it
> > > > > is a
> > > > > >>> > > quite
> > > > > >>> > > > unnatural way of programming in a language like java.
> > > > > >>> > > >
> > > > > >>> > > >
> > > > > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <
> > > > sewen@apache.org>
> > > > > >>> > wrote:
> > > > > >>> > > >
> > > > > >>> > > > > I am curious why the copying is actually needed.
> > > > > >>> > > > >
> > > > > >>> > > > > In the batch API, we chain and do not copy and it is
> > rather
> > > > > >>> > > predictable.
> > > > > >>> > > > >
> > > > > >>> > > > > The cornerpoints of that design is to follow these
> rules:
> > > > > >>> > > > >
> > > > > >>> > > > >  1) Objects read from the network or any buffer are
> > always
> > > > new
> > > > > >>> > objects.
> > > > > >>> > > > > That comes naturally when they are deserialized as part
> > of
> > > > that
> > > > > >>> (all
> > > > > >>> > > > > buffers store serialized)
> > > > > >>> > > > >
> > > > > >>> > > > >  2) After a function returned a record (or gives one to
> > the
> > > > > >>> > collector),
> > > > > >>> > > > it
> > > > > >>> > > > > if given to the chain of chained operators, but after
> it
> > is
> > > > > >>> through
> > > > > >>> > the
> > > > > >>> > > > > chain, no one else holds a reference to that object.
> > > > > >>> > > > >      For that, it is crucial that objects are not
> stored
> > by
> > > > > >>> > reference,
> > > > > >>> > > > but
> > > > > >>> > > > > either stored serialized, or a copy is stored.
> > > > > >>> > > > >
> > > > > >>> > > > > This is quite solid in the batch API. How about we
> follow
> > > the
> > > > > same
> > > > > >>> > > > paradigm
> > > > > >>> > > > > in the streaming API. We would need to adjust the
> > > following:
> > > > > >>> > > > >
> > > > > >>> > > > > 1) Do not copy between operators (I think this is the
> > case
> > > > > right
> > > > > >>> now)
> > > > > >>> > > > >
> > > > > >>> > > > > 2) Copy before putting it into a window buffer and any
> > > other
> > > > > group
> > > > > >>> > > > buffer.
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> > > > > >>> > aljoscha@apache.org
> > > > > >>> > > >
> > > > > >>> > > > > wrote:
> > > > > >>> > > > >
> > > > > >>> > > > > > Yes, in fact I anticipated this. There is one central
> > > place
> > > > > >>> where
> > > > > >>> > we
> > > > > >>> > > > > > can insert a copy step, in OperatorCollector in
> > > > > OutputHandler.
> > > > > >>> > > > > >
> > > > > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <
> > > > > parisc@kth.se>
> > > > > >>> > > wrote:
> > > > > >>> > > > > > > I guess it was not intended ^^.
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > Chaining should be transparent and not break the
> > > > > >>> correct/expected
> > > > > >>> > > > > > behaviour.
> > > > > >>> > > > > > >
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > Paris?
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <
> > > > > mbalassi@apache.org
> > > > > >>> >
> > > > > >>> > > > wrote:
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > +1 for copying.
> > > > > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <
> > > > gyfora@apache.org>
> > > > > >>> > wrote:
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > Hey,
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > The latest streaming operator rework removed the
> > > copying
> > > > of
> > > > > >>> the
> > > > > >>> > > > outputs
> > > > > >>> > > > > > > before passing them to chained operators. This is a
> > > major
> > > > > >>> break
> > > > > >>> > for
> > > > > >>> > > > the
> > > > > >>> > > > > > > previous operator semantics which guaranteed
> > > > immutability.
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > I think this change leads to very indeterministic
> > > program
> > > > > >>> > behaviour
> > > > > >>> > > > > from
> > > > > >>> > > > > > > the user's perspective as only non-chained
> > > outputs/inputs
> > > > > >>> will be
> > > > > >>> > > > > > mutable.
> > > > > >>> > > > > > > If we allow this to happen, users will start
> > disabling
> > > > > >>> chaining
> > > > > >>> > to
> > > > > >>> > > > get
> > > > > >>> > > > > > > immutability which defeats the purpose. (chaining
> > > should
> > > > > not
> > > > > >>> > affect
> > > > > >>> > > > > > program
> > > > > >>> > > > > > > behaviour just increase performance)
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > In my opinion the default setting for each operator
> > > > should
> > > > > be
> > > > > >>> > > > > > immutability
> > > > > >>> > > > > > > and the user could override this manually if he/she
> > > > wants.
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > What do you think?
> > > > > >>> > > > > > >
> > > > > >>> > > > > > > Regards,
> > > > > >>> > > > > > > Gyula
> > > > > >>> > > > > > >
> > > > > >>> > > > > > >
> > > > > >>> > > > > >
> > > > > >>> > > > >
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Stephan Ewen <se...@apache.org>.
I think it is fair to say that everything that Flink has in its core
provides immutability. The mutability effect comes only if the user starts
mutating objects across functions.

The overhead will depend vastly on whether you are sending smaller records
or large records.

I see you are very keen on the failsafe variant. That is fine, I'd say
let's go ahead.

Then let us introduce a switch. The switch needs to work on copies for user
functions only. Until the window buffers are serialized, we need to keep
the copies there.



On Wed, May 20, 2015 at 3:55 PM, Gyula Fóra <gy...@gmail.com> wrote:

> I know it is nicer to have no-copy from a performance perspective, but a
> dataflow system with no immutability guarantee is something very hard to
> describe.
>
> Systems like Storm and Google Dataflow have immutablility guarantees I
> think for the same reason to provide very clear, easy to use semantics.
>
> On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > A vote is the last resort. Consensus through discussion is much nicer.
> And
> > I think we are making progress.
> >
> > We went for the lightweight version in the batch API, because
> >  - there are few cases that are affected (only functions with side effect
> > state)
> >  - you can always switch lightweight -> failsafe in the future (only
> > hardens guarantees), but not the other way around (dropping guarantees)
> > without breaking existing code.
> >
> > If you are strong on that point, I do not want to be a blocker for this.
> I
> > only ask to make a well informed decision, as this behavior is as much
> part
> > of the API as the classname of the DataStream.
> >
> >
> > On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gy...@gmail.com>
> wrote:
> >
> > > I would go for the Failsafe option as a default behaviour with a
> clearly
> > > documented lightweight (no-copy) setting, but I think having a Vote on
> > this
> > > would be the proper way of settling this question.
> > >
> > > On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > I think that in the long run (maybe not too long) we will have to
> > > > change our stateful operators (windows, basically) to use managed
> > > > memory and spill to disk. (Think jobs that have sliding windows over
> > > > days or weeks) Then then the internal operators will take care of
> > > > copying anyways. The problem Gyula mentioned we cannot tackle other
> > > > than by defining how user code must behave.
> > > >
> > > > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > > > It does not mean we have to behave the same way, it is just an
> > > indication
> > > > > that well-defined behavior can allow you to mess things up.
> > > > >
> > > > > The question is now what is the default mode:
> > > > >  - Failsafe/Heavy (always copy)
> > > > >  - Performance/Lightweight (do not copy)
> > > > >
> > > > >
> > > > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > >
> > > > >> This is something that we can clearly define as "should not be
> > done".
> > > > >> Systems do that.
> > > > >> I think if you repeatedly emit (or mutate) the same object for
> > example
> > > > in
> > > > >> Spark, you get an RDD with completely messed up contents.
> > > > >>
> > > > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org>
> > > wrote:
> > > > >>
> > > > >>> If the preceding operator is emitting a mutated object, or does
> > > > something
> > > > >>> with the output object afterwards then its a problem.
> > > > >>>
> > > > >>> Emitting the same object is a special case of this.
> > > > >>>
> > > > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org>
> > > > wrote:
> > > > >>>
> > > > >>> > The case you are making is if a preceding operator in a chain
> is
> > > > >>> repeatedly
> > > > >>> > emitting the same object, and the succeeding operator is
> > gathering
> > > > the
> > > > >>> > objects, then it is a problem
> > > > >>> >
> > > > >>> > Or are there cases where the system itself repeatedly emits the
> > > same
> > > > >>> > objects?
> > > > >>> >
> > > > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyfora@apache.org
> >
> > > > wrote:
> > > > >>> >
> > > > >>> > > We are designing a system for stateful stream computations,
> > > > assuming
> > > > >>> long
> > > > >>> > > standing operators that gather and store data as the stream
> > > evolves
> > > > >>> > (unlike
> > > > >>> > > in the dataset api). Many programs, like windowing, sampling
> > etc
> > > > hold
> > > > >>> the
> > > > >>> > > state in the form of past data. And without careful
> > understanding
> > > > of
> > > > >>> the
> > > > >>> > > runtime these programs will break or have unnecessary copies.
> > > > >>> > >
> > > > >>> > > This is why I think immutability should be the default so we
> > can
> > > > have
> > > > >>> a
> > > > >>> > > clear dataflow model with immutable streams.
> > > > >>> > >
> > > > >>> > > I see absolutely no reason why we cant have the non-copy
> > version
> > > > as an
> > > > >>> > > optional setting for the users.
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <
> parisc@kth.se>
> > > > wrote:
> > > > >>> > >
> > > > >>> > > > @stephan I see your point. If we assume that operators do
> not
> > > > hold
> > > > >>> > > > references in their state to any transmitted records it
> works
> > > > fine.
> > > > >>> We
> > > > >>> > > > therefore need to make this clear to the users. I need to
> > check
> > > > if
> > > > >>> that
> > > > >>> > > > would break semantics in SAMOA or other integrations as
> well
> > > that
> > > > >>> > assume
> > > > >>> > > > immutability. For example in SAMOA there are often local
> > metric
> > > > >>> objects
> > > > >>> > > > that are being constantly mutated and simply forwarded
> > > > periodically
> > > > >>> to
> > > > >>> > > > other (possibly chained) operators that need to evaluate
> > them.
> > > > >>> > > >
> > > > >>> > > > ________________________________________
> > > > >>> > > > From: Gyula Fóra <gy...@apache.org>
> > > > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> > > > >>> > > > To: dev@flink.apache.org
> > > > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained
> operator
> > > > calls
> > > > >>> > > >
> > > > >>> > > > "Copy before putting it into a window buffer and any other
> > > group
> > > > >>> > buffer."
> > > > >>> > > >
> > > > >>> > > > Exactly my point. Any stateful operator should be able to
> > > > implement
> > > > >>> > > > something like this without having to worry about copying
> the
> > > > object
> > > > >>> > (and
> > > > >>> > > > at this point the user would need to know whether it comes
> > from
> > > > the
> > > > >>> > > network
> > > > >>> > > > to avoid unnecessary copies), so I don't agree with leaving
> > the
> > > > copy
> > > > >>> > off.
> > > > >>> > > >
> > > > >>> > > > The user can of course specify that the operator is mutable
> > if
> > > he
> > > > >>> wants
> > > > >>> > > > (and he is worried about the performance), But I still
> think
> > > the
> > > > >>> > default
> > > > >>> > > > behaviour should be immutable.
> > > > >>> > > > We cannot force users to not hold object references and
> also
> > it
> > > > is a
> > > > >>> > > quite
> > > > >>> > > > unnatural way of programming in a language like java.
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <
> > > sewen@apache.org>
> > > > >>> > wrote:
> > > > >>> > > >
> > > > >>> > > > > I am curious why the copying is actually needed.
> > > > >>> > > > >
> > > > >>> > > > > In the batch API, we chain and do not copy and it is
> rather
> > > > >>> > > predictable.
> > > > >>> > > > >
> > > > >>> > > > > The cornerpoints of that design is to follow these rules:
> > > > >>> > > > >
> > > > >>> > > > >  1) Objects read from the network or any buffer are
> always
> > > new
> > > > >>> > objects.
> > > > >>> > > > > That comes naturally when they are deserialized as part
> of
> > > that
> > > > >>> (all
> > > > >>> > > > > buffers store serialized)
> > > > >>> > > > >
> > > > >>> > > > >  2) After a function returned a record (or gives one to
> the
> > > > >>> > collector),
> > > > >>> > > > it
> > > > >>> > > > > if given to the chain of chained operators, but after it
> is
> > > > >>> through
> > > > >>> > the
> > > > >>> > > > > chain, no one else holds a reference to that object.
> > > > >>> > > > >      For that, it is crucial that objects are not stored
> by
> > > > >>> > reference,
> > > > >>> > > > but
> > > > >>> > > > > either stored serialized, or a copy is stored.
> > > > >>> > > > >
> > > > >>> > > > > This is quite solid in the batch API. How about we follow
> > the
> > > > same
> > > > >>> > > > paradigm
> > > > >>> > > > > in the streaming API. We would need to adjust the
> > following:
> > > > >>> > > > >
> > > > >>> > > > > 1) Do not copy between operators (I think this is the
> case
> > > > right
> > > > >>> now)
> > > > >>> > > > >
> > > > >>> > > > > 2) Copy before putting it into a window buffer and any
> > other
> > > > group
> > > > >>> > > > buffer.
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > >
> > > > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> > > > >>> > aljoscha@apache.org
> > > > >>> > > >
> > > > >>> > > > > wrote:
> > > > >>> > > > >
> > > > >>> > > > > > Yes, in fact I anticipated this. There is one central
> > place
> > > > >>> where
> > > > >>> > we
> > > > >>> > > > > > can insert a copy step, in OperatorCollector in
> > > > OutputHandler.
> > > > >>> > > > > >
> > > > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <
> > > > parisc@kth.se>
> > > > >>> > > wrote:
> > > > >>> > > > > > > I guess it was not intended ^^.
> > > > >>> > > > > > >
> > > > >>> > > > > > > Chaining should be transparent and not break the
> > > > >>> correct/expected
> > > > >>> > > > > > behaviour.
> > > > >>> > > > > > >
> > > > >>> > > > > > >
> > > > >>> > > > > > > Paris?
> > > > >>> > > > > > >
> > > > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <
> > > > mbalassi@apache.org
> > > > >>> >
> > > > >>> > > > wrote:
> > > > >>> > > > > > >
> > > > >>> > > > > > > +1 for copying.
> > > > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <
> > > gyfora@apache.org>
> > > > >>> > wrote:
> > > > >>> > > > > > >
> > > > >>> > > > > > > Hey,
> > > > >>> > > > > > >
> > > > >>> > > > > > > The latest streaming operator rework removed the
> > copying
> > > of
> > > > >>> the
> > > > >>> > > > outputs
> > > > >>> > > > > > > before passing them to chained operators. This is a
> > major
> > > > >>> break
> > > > >>> > for
> > > > >>> > > > the
> > > > >>> > > > > > > previous operator semantics which guaranteed
> > > immutability.
> > > > >>> > > > > > >
> > > > >>> > > > > > > I think this change leads to very indeterministic
> > program
> > > > >>> > behaviour
> > > > >>> > > > > from
> > > > >>> > > > > > > the user's perspective as only non-chained
> > outputs/inputs
> > > > >>> will be
> > > > >>> > > > > > mutable.
> > > > >>> > > > > > > If we allow this to happen, users will start
> disabling
> > > > >>> chaining
> > > > >>> > to
> > > > >>> > > > get
> > > > >>> > > > > > > immutability which defeats the purpose. (chaining
> > should
> > > > not
> > > > >>> > affect
> > > > >>> > > > > > program
> > > > >>> > > > > > > behaviour just increase performance)
> > > > >>> > > > > > >
> > > > >>> > > > > > > In my opinion the default setting for each operator
> > > should
> > > > be
> > > > >>> > > > > > immutability
> > > > >>> > > > > > > and the user could override this manually if he/she
> > > wants.
> > > > >>> > > > > > >
> > > > >>> > > > > > > What do you think?
> > > > >>> > > > > > >
> > > > >>> > > > > > > Regards,
> > > > >>> > > > > > > Gyula
> > > > >>> > > > > > >
> > > > >>> > > > > > >
> > > > >>> > > > > >
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Gyula Fóra <gy...@gmail.com>.
I know it is nicer to have no-copy from a performance perspective, but a
dataflow system with no immutability guarantee is something very hard to
describe.

Systems like Storm and Google Dataflow have immutablility guarantees I
think for the same reason to provide very clear, easy to use semantics.

On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org> wrote:

> A vote is the last resort. Consensus through discussion is much nicer. And
> I think we are making progress.
>
> We went for the lightweight version in the batch API, because
>  - there are few cases that are affected (only functions with side effect
> state)
>  - you can always switch lightweight -> failsafe in the future (only
> hardens guarantees), but not the other way around (dropping guarantees)
> without breaking existing code.
>
> If you are strong on that point, I do not want to be a blocker for this. I
> only ask to make a well informed decision, as this behavior is as much part
> of the API as the classname of the DataStream.
>
>
> On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gy...@gmail.com> wrote:
>
> > I would go for the Failsafe option as a default behaviour with a clearly
> > documented lightweight (no-copy) setting, but I think having a Vote on
> this
> > would be the proper way of settling this question.
> >
> > On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > I think that in the long run (maybe not too long) we will have to
> > > change our stateful operators (windows, basically) to use managed
> > > memory and spill to disk. (Think jobs that have sliding windows over
> > > days or weeks) Then then the internal operators will take care of
> > > copying anyways. The problem Gyula mentioned we cannot tackle other
> > > than by defining how user code must behave.
> > >
> > > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > > > It does not mean we have to behave the same way, it is just an
> > indication
> > > > that well-defined behavior can allow you to mess things up.
> > > >
> > > > The question is now what is the default mode:
> > > >  - Failsafe/Heavy (always copy)
> > > >  - Performance/Lightweight (do not copy)
> > > >
> > > >
> > > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > >
> > > >> This is something that we can clearly define as "should not be
> done".
> > > >> Systems do that.
> > > >> I think if you repeatedly emit (or mutate) the same object for
> example
> > > in
> > > >> Spark, you get an RDD with completely messed up contents.
> > > >>
> > > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org>
> > wrote:
> > > >>
> > > >>> If the preceding operator is emitting a mutated object, or does
> > > something
> > > >>> with the output object afterwards then its a problem.
> > > >>>
> > > >>> Emitting the same object is a special case of this.
> > > >>>
> > > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org>
> > > wrote:
> > > >>>
> > > >>> > The case you are making is if a preceding operator in a chain is
> > > >>> repeatedly
> > > >>> > emitting the same object, and the succeeding operator is
> gathering
> > > the
> > > >>> > objects, then it is a problem
> > > >>> >
> > > >>> > Or are there cases where the system itself repeatedly emits the
> > same
> > > >>> > objects?
> > > >>> >
> > > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org>
> > > wrote:
> > > >>> >
> > > >>> > > We are designing a system for stateful stream computations,
> > > assuming
> > > >>> long
> > > >>> > > standing operators that gather and store data as the stream
> > evolves
> > > >>> > (unlike
> > > >>> > > in the dataset api). Many programs, like windowing, sampling
> etc
> > > hold
> > > >>> the
> > > >>> > > state in the form of past data. And without careful
> understanding
> > > of
> > > >>> the
> > > >>> > > runtime these programs will break or have unnecessary copies.
> > > >>> > >
> > > >>> > > This is why I think immutability should be the default so we
> can
> > > have
> > > >>> a
> > > >>> > > clear dataflow model with immutable streams.
> > > >>> > >
> > > >>> > > I see absolutely no reason why we cant have the non-copy
> version
> > > as an
> > > >>> > > optional setting for the users.
> > > >>> > >
> > > >>> > >
> > > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se>
> > > wrote:
> > > >>> > >
> > > >>> > > > @stephan I see your point. If we assume that operators do not
> > > hold
> > > >>> > > > references in their state to any transmitted records it works
> > > fine.
> > > >>> We
> > > >>> > > > therefore need to make this clear to the users. I need to
> check
> > > if
> > > >>> that
> > > >>> > > > would break semantics in SAMOA or other integrations as well
> > that
> > > >>> > assume
> > > >>> > > > immutability. For example in SAMOA there are often local
> metric
> > > >>> objects
> > > >>> > > > that are being constantly mutated and simply forwarded
> > > periodically
> > > >>> to
> > > >>> > > > other (possibly chained) operators that need to evaluate
> them.
> > > >>> > > >
> > > >>> > > > ________________________________________
> > > >>> > > > From: Gyula Fóra <gy...@apache.org>
> > > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> > > >>> > > > To: dev@flink.apache.org
> > > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator
> > > calls
> > > >>> > > >
> > > >>> > > > "Copy before putting it into a window buffer and any other
> > group
> > > >>> > buffer."
> > > >>> > > >
> > > >>> > > > Exactly my point. Any stateful operator should be able to
> > > implement
> > > >>> > > > something like this without having to worry about copying the
> > > object
> > > >>> > (and
> > > >>> > > > at this point the user would need to know whether it comes
> from
> > > the
> > > >>> > > network
> > > >>> > > > to avoid unnecessary copies), so I don't agree with leaving
> the
> > > copy
> > > >>> > off.
> > > >>> > > >
> > > >>> > > > The user can of course specify that the operator is mutable
> if
> > he
> > > >>> wants
> > > >>> > > > (and he is worried about the performance), But I still think
> > the
> > > >>> > default
> > > >>> > > > behaviour should be immutable.
> > > >>> > > > We cannot force users to not hold object references and also
> it
> > > is a
> > > >>> > > quite
> > > >>> > > > unnatural way of programming in a language like java.
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <
> > sewen@apache.org>
> > > >>> > wrote:
> > > >>> > > >
> > > >>> > > > > I am curious why the copying is actually needed.
> > > >>> > > > >
> > > >>> > > > > In the batch API, we chain and do not copy and it is rather
> > > >>> > > predictable.
> > > >>> > > > >
> > > >>> > > > > The cornerpoints of that design is to follow these rules:
> > > >>> > > > >
> > > >>> > > > >  1) Objects read from the network or any buffer are always
> > new
> > > >>> > objects.
> > > >>> > > > > That comes naturally when they are deserialized as part of
> > that
> > > >>> (all
> > > >>> > > > > buffers store serialized)
> > > >>> > > > >
> > > >>> > > > >  2) After a function returned a record (or gives one to the
> > > >>> > collector),
> > > >>> > > > it
> > > >>> > > > > if given to the chain of chained operators, but after it is
> > > >>> through
> > > >>> > the
> > > >>> > > > > chain, no one else holds a reference to that object.
> > > >>> > > > >      For that, it is crucial that objects are not stored by
> > > >>> > reference,
> > > >>> > > > but
> > > >>> > > > > either stored serialized, or a copy is stored.
> > > >>> > > > >
> > > >>> > > > > This is quite solid in the batch API. How about we follow
> the
> > > same
> > > >>> > > > paradigm
> > > >>> > > > > in the streaming API. We would need to adjust the
> following:
> > > >>> > > > >
> > > >>> > > > > 1) Do not copy between operators (I think this is the case
> > > right
> > > >>> now)
> > > >>> > > > >
> > > >>> > > > > 2) Copy before putting it into a window buffer and any
> other
> > > group
> > > >>> > > > buffer.
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> > > >>> > aljoscha@apache.org
> > > >>> > > >
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > > > Yes, in fact I anticipated this. There is one central
> place
> > > >>> where
> > > >>> > we
> > > >>> > > > > > can insert a copy step, in OperatorCollector in
> > > OutputHandler.
> > > >>> > > > > >
> > > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <
> > > parisc@kth.se>
> > > >>> > > wrote:
> > > >>> > > > > > > I guess it was not intended ^^.
> > > >>> > > > > > >
> > > >>> > > > > > > Chaining should be transparent and not break the
> > > >>> correct/expected
> > > >>> > > > > > behaviour.
> > > >>> > > > > > >
> > > >>> > > > > > >
> > > >>> > > > > > > Paris?
> > > >>> > > > > > >
> > > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <
> > > mbalassi@apache.org
> > > >>> >
> > > >>> > > > wrote:
> > > >>> > > > > > >
> > > >>> > > > > > > +1 for copying.
> > > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <
> > gyfora@apache.org>
> > > >>> > wrote:
> > > >>> > > > > > >
> > > >>> > > > > > > Hey,
> > > >>> > > > > > >
> > > >>> > > > > > > The latest streaming operator rework removed the
> copying
> > of
> > > >>> the
> > > >>> > > > outputs
> > > >>> > > > > > > before passing them to chained operators. This is a
> major
> > > >>> break
> > > >>> > for
> > > >>> > > > the
> > > >>> > > > > > > previous operator semantics which guaranteed
> > immutability.
> > > >>> > > > > > >
> > > >>> > > > > > > I think this change leads to very indeterministic
> program
> > > >>> > behaviour
> > > >>> > > > > from
> > > >>> > > > > > > the user's perspective as only non-chained
> outputs/inputs
> > > >>> will be
> > > >>> > > > > > mutable.
> > > >>> > > > > > > If we allow this to happen, users will start disabling
> > > >>> chaining
> > > >>> > to
> > > >>> > > > get
> > > >>> > > > > > > immutability which defeats the purpose. (chaining
> should
> > > not
> > > >>> > affect
> > > >>> > > > > > program
> > > >>> > > > > > > behaviour just increase performance)
> > > >>> > > > > > >
> > > >>> > > > > > > In my opinion the default setting for each operator
> > should
> > > be
> > > >>> > > > > > immutability
> > > >>> > > > > > > and the user could override this manually if he/she
> > wants.
> > > >>> > > > > > >
> > > >>> > > > > > > What do you think?
> > > >>> > > > > > >
> > > >>> > > > > > > Regards,
> > > >>> > > > > > > Gyula
> > > >>> > > > > > >
> > > >>> > > > > > >
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Aljoscha Krettek <al...@apache.org>.
We should maybe run some benchmarks and see what the overhead of
always running a copy between chained operators actually is.

On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org> wrote:
> A vote is the last resort. Consensus through discussion is much nicer. And
> I think we are making progress.
>
> We went for the lightweight version in the batch API, because
>  - there are few cases that are affected (only functions with side effect
> state)
>  - you can always switch lightweight -> failsafe in the future (only
> hardens guarantees), but not the other way around (dropping guarantees)
> without breaking existing code.
>
> If you are strong on that point, I do not want to be a blocker for this. I
> only ask to make a well informed decision, as this behavior is as much part
> of the API as the classname of the DataStream.
>
>
> On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gy...@gmail.com> wrote:
>
>> I would go for the Failsafe option as a default behaviour with a clearly
>> documented lightweight (no-copy) setting, but I think having a Vote on this
>> would be the proper way of settling this question.
>>
>> On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>> > I think that in the long run (maybe not too long) we will have to
>> > change our stateful operators (windows, basically) to use managed
>> > memory and spill to disk. (Think jobs that have sliding windows over
>> > days or weeks) Then then the internal operators will take care of
>> > copying anyways. The problem Gyula mentioned we cannot tackle other
>> > than by defining how user code must behave.
>> >
>> > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org> wrote:
>> > > It does not mean we have to behave the same way, it is just an
>> indication
>> > > that well-defined behavior can allow you to mess things up.
>> > >
>> > > The question is now what is the default mode:
>> > >  - Failsafe/Heavy (always copy)
>> > >  - Performance/Lightweight (do not copy)
>> > >
>> > >
>> > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org>
>> wrote:
>> > >
>> > >> This is something that we can clearly define as "should not be done".
>> > >> Systems do that.
>> > >> I think if you repeatedly emit (or mutate) the same object for example
>> > in
>> > >> Spark, you get an RDD with completely messed up contents.
>> > >>
>> > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org>
>> wrote:
>> > >>
>> > >>> If the preceding operator is emitting a mutated object, or does
>> > something
>> > >>> with the output object afterwards then its a problem.
>> > >>>
>> > >>> Emitting the same object is a special case of this.
>> > >>>
>> > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org>
>> > wrote:
>> > >>>
>> > >>> > The case you are making is if a preceding operator in a chain is
>> > >>> repeatedly
>> > >>> > emitting the same object, and the succeeding operator is gathering
>> > the
>> > >>> > objects, then it is a problem
>> > >>> >
>> > >>> > Or are there cases where the system itself repeatedly emits the
>> same
>> > >>> > objects?
>> > >>> >
>> > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org>
>> > wrote:
>> > >>> >
>> > >>> > > We are designing a system for stateful stream computations,
>> > assuming
>> > >>> long
>> > >>> > > standing operators that gather and store data as the stream
>> evolves
>> > >>> > (unlike
>> > >>> > > in the dataset api). Many programs, like windowing, sampling etc
>> > hold
>> > >>> the
>> > >>> > > state in the form of past data. And without careful understanding
>> > of
>> > >>> the
>> > >>> > > runtime these programs will break or have unnecessary copies.
>> > >>> > >
>> > >>> > > This is why I think immutability should be the default so we can
>> > have
>> > >>> a
>> > >>> > > clear dataflow model with immutable streams.
>> > >>> > >
>> > >>> > > I see absolutely no reason why we cant have the non-copy version
>> > as an
>> > >>> > > optional setting for the users.
>> > >>> > >
>> > >>> > >
>> > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se>
>> > wrote:
>> > >>> > >
>> > >>> > > > @stephan I see your point. If we assume that operators do not
>> > hold
>> > >>> > > > references in their state to any transmitted records it works
>> > fine.
>> > >>> We
>> > >>> > > > therefore need to make this clear to the users. I need to check
>> > if
>> > >>> that
>> > >>> > > > would break semantics in SAMOA or other integrations as well
>> that
>> > >>> > assume
>> > >>> > > > immutability. For example in SAMOA there are often local metric
>> > >>> objects
>> > >>> > > > that are being constantly mutated and simply forwarded
>> > periodically
>> > >>> to
>> > >>> > > > other (possibly chained) operators that need to evaluate them.
>> > >>> > > >
>> > >>> > > > ________________________________________
>> > >>> > > > From: Gyula Fóra <gy...@apache.org>
>> > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
>> > >>> > > > To: dev@flink.apache.org
>> > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator
>> > calls
>> > >>> > > >
>> > >>> > > > "Copy before putting it into a window buffer and any other
>> group
>> > >>> > buffer."
>> > >>> > > >
>> > >>> > > > Exactly my point. Any stateful operator should be able to
>> > implement
>> > >>> > > > something like this without having to worry about copying the
>> > object
>> > >>> > (and
>> > >>> > > > at this point the user would need to know whether it comes from
>> > the
>> > >>> > > network
>> > >>> > > > to avoid unnecessary copies), so I don't agree with leaving the
>> > copy
>> > >>> > off.
>> > >>> > > >
>> > >>> > > > The user can of course specify that the operator is mutable if
>> he
>> > >>> wants
>> > >>> > > > (and he is worried about the performance), But I still think
>> the
>> > >>> > default
>> > >>> > > > behaviour should be immutable.
>> > >>> > > > We cannot force users to not hold object references and also it
>> > is a
>> > >>> > > quite
>> > >>> > > > unnatural way of programming in a language like java.
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <
>> sewen@apache.org>
>> > >>> > wrote:
>> > >>> > > >
>> > >>> > > > > I am curious why the copying is actually needed.
>> > >>> > > > >
>> > >>> > > > > In the batch API, we chain and do not copy and it is rather
>> > >>> > > predictable.
>> > >>> > > > >
>> > >>> > > > > The cornerpoints of that design is to follow these rules:
>> > >>> > > > >
>> > >>> > > > >  1) Objects read from the network or any buffer are always
>> new
>> > >>> > objects.
>> > >>> > > > > That comes naturally when they are deserialized as part of
>> that
>> > >>> (all
>> > >>> > > > > buffers store serialized)
>> > >>> > > > >
>> > >>> > > > >  2) After a function returned a record (or gives one to the
>> > >>> > collector),
>> > >>> > > > it
>> > >>> > > > > if given to the chain of chained operators, but after it is
>> > >>> through
>> > >>> > the
>> > >>> > > > > chain, no one else holds a reference to that object.
>> > >>> > > > >      For that, it is crucial that objects are not stored by
>> > >>> > reference,
>> > >>> > > > but
>> > >>> > > > > either stored serialized, or a copy is stored.
>> > >>> > > > >
>> > >>> > > > > This is quite solid in the batch API. How about we follow the
>> > same
>> > >>> > > > paradigm
>> > >>> > > > > in the streaming API. We would need to adjust the following:
>> > >>> > > > >
>> > >>> > > > > 1) Do not copy between operators (I think this is the case
>> > right
>> > >>> now)
>> > >>> > > > >
>> > >>> > > > > 2) Copy before putting it into a window buffer and any other
>> > group
>> > >>> > > > buffer.
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
>> > >>> > aljoscha@apache.org
>> > >>> > > >
>> > >>> > > > > wrote:
>> > >>> > > > >
>> > >>> > > > > > Yes, in fact I anticipated this. There is one central place
>> > >>> where
>> > >>> > we
>> > >>> > > > > > can insert a copy step, in OperatorCollector in
>> > OutputHandler.
>> > >>> > > > > >
>> > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <
>> > parisc@kth.se>
>> > >>> > > wrote:
>> > >>> > > > > > > I guess it was not intended ^^.
>> > >>> > > > > > >
>> > >>> > > > > > > Chaining should be transparent and not break the
>> > >>> correct/expected
>> > >>> > > > > > behaviour.
>> > >>> > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > > > Paris?
>> > >>> > > > > > >
>> > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <
>> > mbalassi@apache.org
>> > >>> >
>> > >>> > > > wrote:
>> > >>> > > > > > >
>> > >>> > > > > > > +1 for copying.
>> > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <
>> gyfora@apache.org>
>> > >>> > wrote:
>> > >>> > > > > > >
>> > >>> > > > > > > Hey,
>> > >>> > > > > > >
>> > >>> > > > > > > The latest streaming operator rework removed the copying
>> of
>> > >>> the
>> > >>> > > > outputs
>> > >>> > > > > > > before passing them to chained operators. This is a major
>> > >>> break
>> > >>> > for
>> > >>> > > > the
>> > >>> > > > > > > previous operator semantics which guaranteed
>> immutability.
>> > >>> > > > > > >
>> > >>> > > > > > > I think this change leads to very indeterministic program
>> > >>> > behaviour
>> > >>> > > > > from
>> > >>> > > > > > > the user's perspective as only non-chained outputs/inputs
>> > >>> will be
>> > >>> > > > > > mutable.
>> > >>> > > > > > > If we allow this to happen, users will start disabling
>> > >>> chaining
>> > >>> > to
>> > >>> > > > get
>> > >>> > > > > > > immutability which defeats the purpose. (chaining should
>> > not
>> > >>> > affect
>> > >>> > > > > > program
>> > >>> > > > > > > behaviour just increase performance)
>> > >>> > > > > > >
>> > >>> > > > > > > In my opinion the default setting for each operator
>> should
>> > be
>> > >>> > > > > > immutability
>> > >>> > > > > > > and the user could override this manually if he/she
>> wants.
>> > >>> > > > > > >
>> > >>> > > > > > > What do you think?
>> > >>> > > > > > >
>> > >>> > > > > > > Regards,
>> > >>> > > > > > > Gyula
>> > >>> > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> > >>
>> > >>
>> >
>>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Stephan Ewen <se...@apache.org>.
A vote is the last resort. Consensus through discussion is much nicer. And
I think we are making progress.

We went for the lightweight version in the batch API, because
 - there are few cases that are affected (only functions with side effect
state)
 - you can always switch lightweight -> failsafe in the future (only
hardens guarantees), but not the other way around (dropping guarantees)
without breaking existing code.

If you are strong on that point, I do not want to be a blocker for this. I
only ask to make a well informed decision, as this behavior is as much part
of the API as the classname of the DataStream.


On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gy...@gmail.com> wrote:

> I would go for the Failsafe option as a default behaviour with a clearly
> documented lightweight (no-copy) setting, but I think having a Vote on this
> would be the proper way of settling this question.
>
> On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > I think that in the long run (maybe not too long) we will have to
> > change our stateful operators (windows, basically) to use managed
> > memory and spill to disk. (Think jobs that have sliding windows over
> > days or weeks) Then then the internal operators will take care of
> > copying anyways. The problem Gyula mentioned we cannot tackle other
> > than by defining how user code must behave.
> >
> > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org> wrote:
> > > It does not mean we have to behave the same way, it is just an
> indication
> > > that well-defined behavior can allow you to mess things up.
> > >
> > > The question is now what is the default mode:
> > >  - Failsafe/Heavy (always copy)
> > >  - Performance/Lightweight (do not copy)
> > >
> > >
> > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > >> This is something that we can clearly define as "should not be done".
> > >> Systems do that.
> > >> I think if you repeatedly emit (or mutate) the same object for example
> > in
> > >> Spark, you get an RDD with completely messed up contents.
> > >>
> > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org>
> wrote:
> > >>
> > >>> If the preceding operator is emitting a mutated object, or does
> > something
> > >>> with the output object afterwards then its a problem.
> > >>>
> > >>> Emitting the same object is a special case of this.
> > >>>
> > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > >>>
> > >>> > The case you are making is if a preceding operator in a chain is
> > >>> repeatedly
> > >>> > emitting the same object, and the succeeding operator is gathering
> > the
> > >>> > objects, then it is a problem
> > >>> >
> > >>> > Or are there cases where the system itself repeatedly emits the
> same
> > >>> > objects?
> > >>> >
> > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org>
> > wrote:
> > >>> >
> > >>> > > We are designing a system for stateful stream computations,
> > assuming
> > >>> long
> > >>> > > standing operators that gather and store data as the stream
> evolves
> > >>> > (unlike
> > >>> > > in the dataset api). Many programs, like windowing, sampling etc
> > hold
> > >>> the
> > >>> > > state in the form of past data. And without careful understanding
> > of
> > >>> the
> > >>> > > runtime these programs will break or have unnecessary copies.
> > >>> > >
> > >>> > > This is why I think immutability should be the default so we can
> > have
> > >>> a
> > >>> > > clear dataflow model with immutable streams.
> > >>> > >
> > >>> > > I see absolutely no reason why we cant have the non-copy version
> > as an
> > >>> > > optional setting for the users.
> > >>> > >
> > >>> > >
> > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se>
> > wrote:
> > >>> > >
> > >>> > > > @stephan I see your point. If we assume that operators do not
> > hold
> > >>> > > > references in their state to any transmitted records it works
> > fine.
> > >>> We
> > >>> > > > therefore need to make this clear to the users. I need to check
> > if
> > >>> that
> > >>> > > > would break semantics in SAMOA or other integrations as well
> that
> > >>> > assume
> > >>> > > > immutability. For example in SAMOA there are often local metric
> > >>> objects
> > >>> > > > that are being constantly mutated and simply forwarded
> > periodically
> > >>> to
> > >>> > > > other (possibly chained) operators that need to evaluate them.
> > >>> > > >
> > >>> > > > ________________________________________
> > >>> > > > From: Gyula Fóra <gy...@apache.org>
> > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> > >>> > > > To: dev@flink.apache.org
> > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator
> > calls
> > >>> > > >
> > >>> > > > "Copy before putting it into a window buffer and any other
> group
> > >>> > buffer."
> > >>> > > >
> > >>> > > > Exactly my point. Any stateful operator should be able to
> > implement
> > >>> > > > something like this without having to worry about copying the
> > object
> > >>> > (and
> > >>> > > > at this point the user would need to know whether it comes from
> > the
> > >>> > > network
> > >>> > > > to avoid unnecessary copies), so I don't agree with leaving the
> > copy
> > >>> > off.
> > >>> > > >
> > >>> > > > The user can of course specify that the operator is mutable if
> he
> > >>> wants
> > >>> > > > (and he is worried about the performance), But I still think
> the
> > >>> > default
> > >>> > > > behaviour should be immutable.
> > >>> > > > We cannot force users to not hold object references and also it
> > is a
> > >>> > > quite
> > >>> > > > unnatural way of programming in a language like java.
> > >>> > > >
> > >>> > > >
> > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <
> sewen@apache.org>
> > >>> > wrote:
> > >>> > > >
> > >>> > > > > I am curious why the copying is actually needed.
> > >>> > > > >
> > >>> > > > > In the batch API, we chain and do not copy and it is rather
> > >>> > > predictable.
> > >>> > > > >
> > >>> > > > > The cornerpoints of that design is to follow these rules:
> > >>> > > > >
> > >>> > > > >  1) Objects read from the network or any buffer are always
> new
> > >>> > objects.
> > >>> > > > > That comes naturally when they are deserialized as part of
> that
> > >>> (all
> > >>> > > > > buffers store serialized)
> > >>> > > > >
> > >>> > > > >  2) After a function returned a record (or gives one to the
> > >>> > collector),
> > >>> > > > it
> > >>> > > > > if given to the chain of chained operators, but after it is
> > >>> through
> > >>> > the
> > >>> > > > > chain, no one else holds a reference to that object.
> > >>> > > > >      For that, it is crucial that objects are not stored by
> > >>> > reference,
> > >>> > > > but
> > >>> > > > > either stored serialized, or a copy is stored.
> > >>> > > > >
> > >>> > > > > This is quite solid in the batch API. How about we follow the
> > same
> > >>> > > > paradigm
> > >>> > > > > in the streaming API. We would need to adjust the following:
> > >>> > > > >
> > >>> > > > > 1) Do not copy between operators (I think this is the case
> > right
> > >>> now)
> > >>> > > > >
> > >>> > > > > 2) Copy before putting it into a window buffer and any other
> > group
> > >>> > > > buffer.
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> > >>> > aljoscha@apache.org
> > >>> > > >
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Yes, in fact I anticipated this. There is one central place
> > >>> where
> > >>> > we
> > >>> > > > > > can insert a copy step, in OperatorCollector in
> > OutputHandler.
> > >>> > > > > >
> > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <
> > parisc@kth.se>
> > >>> > > wrote:
> > >>> > > > > > > I guess it was not intended ^^.
> > >>> > > > > > >
> > >>> > > > > > > Chaining should be transparent and not break the
> > >>> correct/expected
> > >>> > > > > > behaviour.
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > Paris?
> > >>> > > > > > >
> > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <
> > mbalassi@apache.org
> > >>> >
> > >>> > > > wrote:
> > >>> > > > > > >
> > >>> > > > > > > +1 for copying.
> > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <
> gyfora@apache.org>
> > >>> > wrote:
> > >>> > > > > > >
> > >>> > > > > > > Hey,
> > >>> > > > > > >
> > >>> > > > > > > The latest streaming operator rework removed the copying
> of
> > >>> the
> > >>> > > > outputs
> > >>> > > > > > > before passing them to chained operators. This is a major
> > >>> break
> > >>> > for
> > >>> > > > the
> > >>> > > > > > > previous operator semantics which guaranteed
> immutability.
> > >>> > > > > > >
> > >>> > > > > > > I think this change leads to very indeterministic program
> > >>> > behaviour
> > >>> > > > > from
> > >>> > > > > > > the user's perspective as only non-chained outputs/inputs
> > >>> will be
> > >>> > > > > > mutable.
> > >>> > > > > > > If we allow this to happen, users will start disabling
> > >>> chaining
> > >>> > to
> > >>> > > > get
> > >>> > > > > > > immutability which defeats the purpose. (chaining should
> > not
> > >>> > affect
> > >>> > > > > > program
> > >>> > > > > > > behaviour just increase performance)
> > >>> > > > > > >
> > >>> > > > > > > In my opinion the default setting for each operator
> should
> > be
> > >>> > > > > > immutability
> > >>> > > > > > > and the user could override this manually if he/she
> wants.
> > >>> > > > > > >
> > >>> > > > > > > What do you think?
> > >>> > > > > > >
> > >>> > > > > > > Regards,
> > >>> > > > > > > Gyula
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Gyula Fóra <gy...@gmail.com>.
I would go for the Failsafe option as a default behaviour with a clearly
documented lightweight (no-copy) setting, but I think having a Vote on this
would be the proper way of settling this question.

On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> I think that in the long run (maybe not too long) we will have to
> change our stateful operators (windows, basically) to use managed
> memory and spill to disk. (Think jobs that have sliding windows over
> days or weeks) Then then the internal operators will take care of
> copying anyways. The problem Gyula mentioned we cannot tackle other
> than by defining how user code must behave.
>
> On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org> wrote:
> > It does not mean we have to behave the same way, it is just an indication
> > that well-defined behavior can allow you to mess things up.
> >
> > The question is now what is the default mode:
> >  - Failsafe/Heavy (always copy)
> >  - Performance/Lightweight (do not copy)
> >
> >
> > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> >> This is something that we can clearly define as "should not be done".
> >> Systems do that.
> >> I think if you repeatedly emit (or mutate) the same object for example
> in
> >> Spark, you get an RDD with completely messed up contents.
> >>
> >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org> wrote:
> >>
> >>> If the preceding operator is emitting a mutated object, or does
> something
> >>> with the output object afterwards then its a problem.
> >>>
> >>> Emitting the same object is a special case of this.
> >>>
> >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org>
> wrote:
> >>>
> >>> > The case you are making is if a preceding operator in a chain is
> >>> repeatedly
> >>> > emitting the same object, and the succeeding operator is gathering
> the
> >>> > objects, then it is a problem
> >>> >
> >>> > Or are there cases where the system itself repeatedly emits the same
> >>> > objects?
> >>> >
> >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org>
> wrote:
> >>> >
> >>> > > We are designing a system for stateful stream computations,
> assuming
> >>> long
> >>> > > standing operators that gather and store data as the stream evolves
> >>> > (unlike
> >>> > > in the dataset api). Many programs, like windowing, sampling etc
> hold
> >>> the
> >>> > > state in the form of past data. And without careful understanding
> of
> >>> the
> >>> > > runtime these programs will break or have unnecessary copies.
> >>> > >
> >>> > > This is why I think immutability should be the default so we can
> have
> >>> a
> >>> > > clear dataflow model with immutable streams.
> >>> > >
> >>> > > I see absolutely no reason why we cant have the non-copy version
> as an
> >>> > > optional setting for the users.
> >>> > >
> >>> > >
> >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se>
> wrote:
> >>> > >
> >>> > > > @stephan I see your point. If we assume that operators do not
> hold
> >>> > > > references in their state to any transmitted records it works
> fine.
> >>> We
> >>> > > > therefore need to make this clear to the users. I need to check
> if
> >>> that
> >>> > > > would break semantics in SAMOA or other integrations as well that
> >>> > assume
> >>> > > > immutability. For example in SAMOA there are often local metric
> >>> objects
> >>> > > > that are being constantly mutated and simply forwarded
> periodically
> >>> to
> >>> > > > other (possibly chained) operators that need to evaluate them.
> >>> > > >
> >>> > > > ________________________________________
> >>> > > > From: Gyula Fóra <gy...@apache.org>
> >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> >>> > > > To: dev@flink.apache.org
> >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator
> calls
> >>> > > >
> >>> > > > "Copy before putting it into a window buffer and any other group
> >>> > buffer."
> >>> > > >
> >>> > > > Exactly my point. Any stateful operator should be able to
> implement
> >>> > > > something like this without having to worry about copying the
> object
> >>> > (and
> >>> > > > at this point the user would need to know whether it comes from
> the
> >>> > > network
> >>> > > > to avoid unnecessary copies), so I don't agree with leaving the
> copy
> >>> > off.
> >>> > > >
> >>> > > > The user can of course specify that the operator is mutable if he
> >>> wants
> >>> > > > (and he is worried about the performance), But I still think the
> >>> > default
> >>> > > > behaviour should be immutable.
> >>> > > > We cannot force users to not hold object references and also it
> is a
> >>> > > quite
> >>> > > > unnatural way of programming in a language like java.
> >>> > > >
> >>> > > >
> >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org>
> >>> > wrote:
> >>> > > >
> >>> > > > > I am curious why the copying is actually needed.
> >>> > > > >
> >>> > > > > In the batch API, we chain and do not copy and it is rather
> >>> > > predictable.
> >>> > > > >
> >>> > > > > The cornerpoints of that design is to follow these rules:
> >>> > > > >
> >>> > > > >  1) Objects read from the network or any buffer are always new
> >>> > objects.
> >>> > > > > That comes naturally when they are deserialized as part of that
> >>> (all
> >>> > > > > buffers store serialized)
> >>> > > > >
> >>> > > > >  2) After a function returned a record (or gives one to the
> >>> > collector),
> >>> > > > it
> >>> > > > > if given to the chain of chained operators, but after it is
> >>> through
> >>> > the
> >>> > > > > chain, no one else holds a reference to that object.
> >>> > > > >      For that, it is crucial that objects are not stored by
> >>> > reference,
> >>> > > > but
> >>> > > > > either stored serialized, or a copy is stored.
> >>> > > > >
> >>> > > > > This is quite solid in the batch API. How about we follow the
> same
> >>> > > > paradigm
> >>> > > > > in the streaming API. We would need to adjust the following:
> >>> > > > >
> >>> > > > > 1) Do not copy between operators (I think this is the case
> right
> >>> now)
> >>> > > > >
> >>> > > > > 2) Copy before putting it into a window buffer and any other
> group
> >>> > > > buffer.
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> >>> > aljoscha@apache.org
> >>> > > >
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Yes, in fact I anticipated this. There is one central place
> >>> where
> >>> > we
> >>> > > > > > can insert a copy step, in OperatorCollector in
> OutputHandler.
> >>> > > > > >
> >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <
> parisc@kth.se>
> >>> > > wrote:
> >>> > > > > > > I guess it was not intended ^^.
> >>> > > > > > >
> >>> > > > > > > Chaining should be transparent and not break the
> >>> correct/expected
> >>> > > > > > behaviour.
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > Paris?
> >>> > > > > > >
> >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <
> mbalassi@apache.org
> >>> >
> >>> > > > wrote:
> >>> > > > > > >
> >>> > > > > > > +1 for copying.
> >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org>
> >>> > wrote:
> >>> > > > > > >
> >>> > > > > > > Hey,
> >>> > > > > > >
> >>> > > > > > > The latest streaming operator rework removed the copying of
> >>> the
> >>> > > > outputs
> >>> > > > > > > before passing them to chained operators. This is a major
> >>> break
> >>> > for
> >>> > > > the
> >>> > > > > > > previous operator semantics which guaranteed immutability.
> >>> > > > > > >
> >>> > > > > > > I think this change leads to very indeterministic program
> >>> > behaviour
> >>> > > > > from
> >>> > > > > > > the user's perspective as only non-chained outputs/inputs
> >>> will be
> >>> > > > > > mutable.
> >>> > > > > > > If we allow this to happen, users will start disabling
> >>> chaining
> >>> > to
> >>> > > > get
> >>> > > > > > > immutability which defeats the purpose. (chaining should
> not
> >>> > affect
> >>> > > > > > program
> >>> > > > > > > behaviour just increase performance)
> >>> > > > > > >
> >>> > > > > > > In my opinion the default setting for each operator should
> be
> >>> > > > > > immutability
> >>> > > > > > > and the user could override this manually if he/she wants.
> >>> > > > > > >
> >>> > > > > > > What do you think?
> >>> > > > > > >
> >>> > > > > > > Regards,
> >>> > > > > > > Gyula
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Aljoscha Krettek <al...@apache.org>.
I think that in the long run (maybe not too long) we will have to
change our stateful operators (windows, basically) to use managed
memory and spill to disk. (Think jobs that have sliding windows over
days or weeks) Then then the internal operators will take care of
copying anyways. The problem Gyula mentioned we cannot tackle other
than by defining how user code must behave.

On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org> wrote:
> It does not mean we have to behave the same way, it is just an indication
> that well-defined behavior can allow you to mess things up.
>
> The question is now what is the default mode:
>  - Failsafe/Heavy (always copy)
>  - Performance/Lightweight (do not copy)
>
>
> On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> This is something that we can clearly define as "should not be done".
>> Systems do that.
>> I think if you repeatedly emit (or mutate) the same object for example in
>> Spark, you get an RDD with completely messed up contents.
>>
>> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org> wrote:
>>
>>> If the preceding operator is emitting a mutated object, or does something
>>> with the output object afterwards then its a problem.
>>>
>>> Emitting the same object is a special case of this.
>>>
>>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>> > The case you are making is if a preceding operator in a chain is
>>> repeatedly
>>> > emitting the same object, and the succeeding operator is gathering the
>>> > objects, then it is a problem
>>> >
>>> > Or are there cases where the system itself repeatedly emits the same
>>> > objects?
>>> >
>>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org> wrote:
>>> >
>>> > > We are designing a system for stateful stream computations, assuming
>>> long
>>> > > standing operators that gather and store data as the stream evolves
>>> > (unlike
>>> > > in the dataset api). Many programs, like windowing, sampling etc hold
>>> the
>>> > > state in the form of past data. And without careful understanding of
>>> the
>>> > > runtime these programs will break or have unnecessary copies.
>>> > >
>>> > > This is why I think immutability should be the default so we can have
>>> a
>>> > > clear dataflow model with immutable streams.
>>> > >
>>> > > I see absolutely no reason why we cant have the non-copy version as an
>>> > > optional setting for the users.
>>> > >
>>> > >
>>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se> wrote:
>>> > >
>>> > > > @stephan I see your point. If we assume that operators do not hold
>>> > > > references in their state to any transmitted records it works fine.
>>> We
>>> > > > therefore need to make this clear to the users. I need to check if
>>> that
>>> > > > would break semantics in SAMOA or other integrations as well that
>>> > assume
>>> > > > immutability. For example in SAMOA there are often local metric
>>> objects
>>> > > > that are being constantly mutated and simply forwarded periodically
>>> to
>>> > > > other (possibly chained) operators that need to evaluate them.
>>> > > >
>>> > > > ________________________________________
>>> > > > From: Gyula Fóra <gy...@apache.org>
>>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
>>> > > > To: dev@flink.apache.org
>>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator calls
>>> > > >
>>> > > > "Copy before putting it into a window buffer and any other group
>>> > buffer."
>>> > > >
>>> > > > Exactly my point. Any stateful operator should be able to implement
>>> > > > something like this without having to worry about copying the object
>>> > (and
>>> > > > at this point the user would need to know whether it comes from the
>>> > > network
>>> > > > to avoid unnecessary copies), so I don't agree with leaving the copy
>>> > off.
>>> > > >
>>> > > > The user can of course specify that the operator is mutable if he
>>> wants
>>> > > > (and he is worried about the performance), But I still think the
>>> > default
>>> > > > behaviour should be immutable.
>>> > > > We cannot force users to not hold object references and also it is a
>>> > > quite
>>> > > > unnatural way of programming in a language like java.
>>> > > >
>>> > > >
>>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org>
>>> > wrote:
>>> > > >
>>> > > > > I am curious why the copying is actually needed.
>>> > > > >
>>> > > > > In the batch API, we chain and do not copy and it is rather
>>> > > predictable.
>>> > > > >
>>> > > > > The cornerpoints of that design is to follow these rules:
>>> > > > >
>>> > > > >  1) Objects read from the network or any buffer are always new
>>> > objects.
>>> > > > > That comes naturally when they are deserialized as part of that
>>> (all
>>> > > > > buffers store serialized)
>>> > > > >
>>> > > > >  2) After a function returned a record (or gives one to the
>>> > collector),
>>> > > > it
>>> > > > > if given to the chain of chained operators, but after it is
>>> through
>>> > the
>>> > > > > chain, no one else holds a reference to that object.
>>> > > > >      For that, it is crucial that objects are not stored by
>>> > reference,
>>> > > > but
>>> > > > > either stored serialized, or a copy is stored.
>>> > > > >
>>> > > > > This is quite solid in the batch API. How about we follow the same
>>> > > > paradigm
>>> > > > > in the streaming API. We would need to adjust the following:
>>> > > > >
>>> > > > > 1) Do not copy between operators (I think this is the case right
>>> now)
>>> > > > >
>>> > > > > 2) Copy before putting it into a window buffer and any other group
>>> > > > buffer.
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
>>> > aljoscha@apache.org
>>> > > >
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Yes, in fact I anticipated this. There is one central place
>>> where
>>> > we
>>> > > > > > can insert a copy step, in OperatorCollector in OutputHandler.
>>> > > > > >
>>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se>
>>> > > wrote:
>>> > > > > > > I guess it was not intended ^^.
>>> > > > > > >
>>> > > > > > > Chaining should be transparent and not break the
>>> correct/expected
>>> > > > > > behaviour.
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > Paris?
>>> > > > > > >
>>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <mbalassi@apache.org
>>> >
>>> > > > wrote:
>>> > > > > > >
>>> > > > > > > +1 for copying.
>>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org>
>>> > wrote:
>>> > > > > > >
>>> > > > > > > Hey,
>>> > > > > > >
>>> > > > > > > The latest streaming operator rework removed the copying of
>>> the
>>> > > > outputs
>>> > > > > > > before passing them to chained operators. This is a major
>>> break
>>> > for
>>> > > > the
>>> > > > > > > previous operator semantics which guaranteed immutability.
>>> > > > > > >
>>> > > > > > > I think this change leads to very indeterministic program
>>> > behaviour
>>> > > > > from
>>> > > > > > > the user's perspective as only non-chained outputs/inputs
>>> will be
>>> > > > > > mutable.
>>> > > > > > > If we allow this to happen, users will start disabling
>>> chaining
>>> > to
>>> > > > get
>>> > > > > > > immutability which defeats the purpose. (chaining should not
>>> > affect
>>> > > > > > program
>>> > > > > > > behaviour just increase performance)
>>> > > > > > >
>>> > > > > > > In my opinion the default setting for each operator should be
>>> > > > > > immutability
>>> > > > > > > and the user could override this manually if he/she wants.
>>> > > > > > >
>>> > > > > > > What do you think?
>>> > > > > > >
>>> > > > > > > Regards,
>>> > > > > > > Gyula
>>> > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Stephan Ewen <se...@apache.org>.
It does not mean we have to behave the same way, it is just an indication
that well-defined behavior can allow you to mess things up.

The question is now what is the default mode:
 - Failsafe/Heavy (always copy)
 - Performance/Lightweight (do not copy)


On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org> wrote:

> This is something that we can clearly define as "should not be done".
> Systems do that.
> I think if you repeatedly emit (or mutate) the same object for example in
> Spark, you get an RDD with completely messed up contents.
>
> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org> wrote:
>
>> If the preceding operator is emitting a mutated object, or does something
>> with the output object afterwards then its a problem.
>>
>> Emitting the same object is a special case of this.
>>
>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>> > The case you are making is if a preceding operator in a chain is
>> repeatedly
>> > emitting the same object, and the succeeding operator is gathering the
>> > objects, then it is a problem
>> >
>> > Or are there cases where the system itself repeatedly emits the same
>> > objects?
>> >
>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org> wrote:
>> >
>> > > We are designing a system for stateful stream computations, assuming
>> long
>> > > standing operators that gather and store data as the stream evolves
>> > (unlike
>> > > in the dataset api). Many programs, like windowing, sampling etc hold
>> the
>> > > state in the form of past data. And without careful understanding of
>> the
>> > > runtime these programs will break or have unnecessary copies.
>> > >
>> > > This is why I think immutability should be the default so we can have
>> a
>> > > clear dataflow model with immutable streams.
>> > >
>> > > I see absolutely no reason why we cant have the non-copy version as an
>> > > optional setting for the users.
>> > >
>> > >
>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se> wrote:
>> > >
>> > > > @stephan I see your point. If we assume that operators do not hold
>> > > > references in their state to any transmitted records it works fine.
>> We
>> > > > therefore need to make this clear to the users. I need to check if
>> that
>> > > > would break semantics in SAMOA or other integrations as well that
>> > assume
>> > > > immutability. For example in SAMOA there are often local metric
>> objects
>> > > > that are being constantly mutated and simply forwarded periodically
>> to
>> > > > other (possibly chained) operators that need to evaluate them.
>> > > >
>> > > > ________________________________________
>> > > > From: Gyula Fóra <gy...@apache.org>
>> > > > Sent: Wednesday, May 20, 2015 2:06 PM
>> > > > To: dev@flink.apache.org
>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator calls
>> > > >
>> > > > "Copy before putting it into a window buffer and any other group
>> > buffer."
>> > > >
>> > > > Exactly my point. Any stateful operator should be able to implement
>> > > > something like this without having to worry about copying the object
>> > (and
>> > > > at this point the user would need to know whether it comes from the
>> > > network
>> > > > to avoid unnecessary copies), so I don't agree with leaving the copy
>> > off.
>> > > >
>> > > > The user can of course specify that the operator is mutable if he
>> wants
>> > > > (and he is worried about the performance), But I still think the
>> > default
>> > > > behaviour should be immutable.
>> > > > We cannot force users to not hold object references and also it is a
>> > > quite
>> > > > unnatural way of programming in a language like java.
>> > > >
>> > > >
>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org>
>> > wrote:
>> > > >
>> > > > > I am curious why the copying is actually needed.
>> > > > >
>> > > > > In the batch API, we chain and do not copy and it is rather
>> > > predictable.
>> > > > >
>> > > > > The cornerpoints of that design is to follow these rules:
>> > > > >
>> > > > >  1) Objects read from the network or any buffer are always new
>> > objects.
>> > > > > That comes naturally when they are deserialized as part of that
>> (all
>> > > > > buffers store serialized)
>> > > > >
>> > > > >  2) After a function returned a record (or gives one to the
>> > collector),
>> > > > it
>> > > > > if given to the chain of chained operators, but after it is
>> through
>> > the
>> > > > > chain, no one else holds a reference to that object.
>> > > > >      For that, it is crucial that objects are not stored by
>> > reference,
>> > > > but
>> > > > > either stored serialized, or a copy is stored.
>> > > > >
>> > > > > This is quite solid in the batch API. How about we follow the same
>> > > > paradigm
>> > > > > in the streaming API. We would need to adjust the following:
>> > > > >
>> > > > > 1) Do not copy between operators (I think this is the case right
>> now)
>> > > > >
>> > > > > 2) Copy before putting it into a window buffer and any other group
>> > > > buffer.
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
>> > aljoscha@apache.org
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Yes, in fact I anticipated this. There is one central place
>> where
>> > we
>> > > > > > can insert a copy step, in OperatorCollector in OutputHandler.
>> > > > > >
>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se>
>> > > wrote:
>> > > > > > > I guess it was not intended ^^.
>> > > > > > >
>> > > > > > > Chaining should be transparent and not break the
>> correct/expected
>> > > > > > behaviour.
>> > > > > > >
>> > > > > > >
>> > > > > > > Paris?
>> > > > > > >
>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <mbalassi@apache.org
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > +1 for copying.
>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org>
>> > wrote:
>> > > > > > >
>> > > > > > > Hey,
>> > > > > > >
>> > > > > > > The latest streaming operator rework removed the copying of
>> the
>> > > > outputs
>> > > > > > > before passing them to chained operators. This is a major
>> break
>> > for
>> > > > the
>> > > > > > > previous operator semantics which guaranteed immutability.
>> > > > > > >
>> > > > > > > I think this change leads to very indeterministic program
>> > behaviour
>> > > > > from
>> > > > > > > the user's perspective as only non-chained outputs/inputs
>> will be
>> > > > > > mutable.
>> > > > > > > If we allow this to happen, users will start disabling
>> chaining
>> > to
>> > > > get
>> > > > > > > immutability which defeats the purpose. (chaining should not
>> > affect
>> > > > > > program
>> > > > > > > behaviour just increase performance)
>> > > > > > >
>> > > > > > > In my opinion the default setting for each operator should be
>> > > > > > immutability
>> > > > > > > and the user could override this manually if he/she wants.
>> > > > > > >
>> > > > > > > What do you think?
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Gyula
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Stephan Ewen <se...@apache.org>.
This is something that we can clearly define as "should not be done".
Systems do that.
I think if you repeatedly emit (or mutate) the same object for example in
Spark, you get an RDD with completely messed up contents.

On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gy...@apache.org> wrote:

> If the preceding operator is emitting a mutated object, or does something
> with the output object afterwards then its a problem.
>
> Emitting the same object is a special case of this.
>
> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > The case you are making is if a preceding operator in a chain is
> repeatedly
> > emitting the same object, and the succeeding operator is gathering the
> > objects, then it is a problem
> >
> > Or are there cases where the system itself repeatedly emits the same
> > objects?
> >
> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org> wrote:
> >
> > > We are designing a system for stateful stream computations, assuming
> long
> > > standing operators that gather and store data as the stream evolves
> > (unlike
> > > in the dataset api). Many programs, like windowing, sampling etc hold
> the
> > > state in the form of past data. And without careful understanding of
> the
> > > runtime these programs will break or have unnecessary copies.
> > >
> > > This is why I think immutability should be the default so we can have a
> > > clear dataflow model with immutable streams.
> > >
> > > I see absolutely no reason why we cant have the non-copy version as an
> > > optional setting for the users.
> > >
> > >
> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se> wrote:
> > >
> > > > @stephan I see your point. If we assume that operators do not hold
> > > > references in their state to any transmitted records it works fine.
> We
> > > > therefore need to make this clear to the users. I need to check if
> that
> > > > would break semantics in SAMOA or other integrations as well that
> > assume
> > > > immutability. For example in SAMOA there are often local metric
> objects
> > > > that are being constantly mutated and simply forwarded periodically
> to
> > > > other (possibly chained) operators that need to evaluate them.
> > > >
> > > > ________________________________________
> > > > From: Gyula Fóra <gy...@apache.org>
> > > > Sent: Wednesday, May 20, 2015 2:06 PM
> > > > To: dev@flink.apache.org
> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator calls
> > > >
> > > > "Copy before putting it into a window buffer and any other group
> > buffer."
> > > >
> > > > Exactly my point. Any stateful operator should be able to implement
> > > > something like this without having to worry about copying the object
> > (and
> > > > at this point the user would need to know whether it comes from the
> > > network
> > > > to avoid unnecessary copies), so I don't agree with leaving the copy
> > off.
> > > >
> > > > The user can of course specify that the operator is mutable if he
> wants
> > > > (and he is worried about the performance), But I still think the
> > default
> > > > behaviour should be immutable.
> > > > We cannot force users to not hold object references and also it is a
> > > quite
> > > > unnatural way of programming in a language like java.
> > > >
> > > >
> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > >
> > > > > I am curious why the copying is actually needed.
> > > > >
> > > > > In the batch API, we chain and do not copy and it is rather
> > > predictable.
> > > > >
> > > > > The cornerpoints of that design is to follow these rules:
> > > > >
> > > > >  1) Objects read from the network or any buffer are always new
> > objects.
> > > > > That comes naturally when they are deserialized as part of that
> (all
> > > > > buffers store serialized)
> > > > >
> > > > >  2) After a function returned a record (or gives one to the
> > collector),
> > > > it
> > > > > if given to the chain of chained operators, but after it is through
> > the
> > > > > chain, no one else holds a reference to that object.
> > > > >      For that, it is crucial that objects are not stored by
> > reference,
> > > > but
> > > > > either stored serialized, or a copy is stored.
> > > > >
> > > > > This is quite solid in the batch API. How about we follow the same
> > > > paradigm
> > > > > in the streaming API. We would need to adjust the following:
> > > > >
> > > > > 1) Do not copy between operators (I think this is the case right
> now)
> > > > >
> > > > > 2) Copy before putting it into a window buffer and any other group
> > > > buffer.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Yes, in fact I anticipated this. There is one central place where
> > we
> > > > > > can insert a copy step, in OperatorCollector in OutputHandler.
> > > > > >
> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se>
> > > wrote:
> > > > > > > I guess it was not intended ^^.
> > > > > > >
> > > > > > > Chaining should be transparent and not break the
> correct/expected
> > > > > > behaviour.
> > > > > > >
> > > > > > >
> > > > > > > Paris?
> > > > > > >
> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > > +1 for copying.
> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org>
> > wrote:
> > > > > > >
> > > > > > > Hey,
> > > > > > >
> > > > > > > The latest streaming operator rework removed the copying of the
> > > > outputs
> > > > > > > before passing them to chained operators. This is a major break
> > for
> > > > the
> > > > > > > previous operator semantics which guaranteed immutability.
> > > > > > >
> > > > > > > I think this change leads to very indeterministic program
> > behaviour
> > > > > from
> > > > > > > the user's perspective as only non-chained outputs/inputs will
> be
> > > > > > mutable.
> > > > > > > If we allow this to happen, users will start disabling chaining
> > to
> > > > get
> > > > > > > immutability which defeats the purpose. (chaining should not
> > affect
> > > > > > program
> > > > > > > behaviour just increase performance)
> > > > > > >
> > > > > > > In my opinion the default setting for each operator should be
> > > > > > immutability
> > > > > > > and the user could override this manually if he/she wants.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Gyula
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Gyula Fóra <gy...@apache.org>.
If the preceding operator is emitting a mutated object, or does something
with the output object afterwards then its a problem.

Emitting the same object is a special case of this.

On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org> wrote:

> The case you are making is if a preceding operator in a chain is repeatedly
> emitting the same object, and the succeeding operator is gathering the
> objects, then it is a problem
>
> Or are there cases where the system itself repeatedly emits the same
> objects?
>
> On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org> wrote:
>
> > We are designing a system for stateful stream computations, assuming long
> > standing operators that gather and store data as the stream evolves
> (unlike
> > in the dataset api). Many programs, like windowing, sampling etc hold the
> > state in the form of past data. And without careful understanding of the
> > runtime these programs will break or have unnecessary copies.
> >
> > This is why I think immutability should be the default so we can have a
> > clear dataflow model with immutable streams.
> >
> > I see absolutely no reason why we cant have the non-copy version as an
> > optional setting for the users.
> >
> >
> > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se> wrote:
> >
> > > @stephan I see your point. If we assume that operators do not hold
> > > references in their state to any transmitted records it works fine. We
> > > therefore need to make this clear to the users. I need to check if that
> > > would break semantics in SAMOA or other integrations as well that
> assume
> > > immutability. For example in SAMOA there are often local metric objects
> > > that are being constantly mutated and simply forwarded periodically to
> > > other (possibly chained) operators that need to evaluate them.
> > >
> > > ________________________________________
> > > From: Gyula Fóra <gy...@apache.org>
> > > Sent: Wednesday, May 20, 2015 2:06 PM
> > > To: dev@flink.apache.org
> > > Subject: Re: [DISCUSS] Re-add record copy to chained operator calls
> > >
> > > "Copy before putting it into a window buffer and any other group
> buffer."
> > >
> > > Exactly my point. Any stateful operator should be able to implement
> > > something like this without having to worry about copying the object
> (and
> > > at this point the user would need to know whether it comes from the
> > network
> > > to avoid unnecessary copies), so I don't agree with leaving the copy
> off.
> > >
> > > The user can of course specify that the operator is mutable if he wants
> > > (and he is worried about the performance), But I still think the
> default
> > > behaviour should be immutable.
> > > We cannot force users to not hold object references and also it is a
> > quite
> > > unnatural way of programming in a language like java.
> > >
> > >
> > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > I am curious why the copying is actually needed.
> > > >
> > > > In the batch API, we chain and do not copy and it is rather
> > predictable.
> > > >
> > > > The cornerpoints of that design is to follow these rules:
> > > >
> > > >  1) Objects read from the network or any buffer are always new
> objects.
> > > > That comes naturally when they are deserialized as part of that (all
> > > > buffers store serialized)
> > > >
> > > >  2) After a function returned a record (or gives one to the
> collector),
> > > it
> > > > if given to the chain of chained operators, but after it is through
> the
> > > > chain, no one else holds a reference to that object.
> > > >      For that, it is crucial that objects are not stored by
> reference,
> > > but
> > > > either stored serialized, or a copy is stored.
> > > >
> > > > This is quite solid in the batch API. How about we follow the same
> > > paradigm
> > > > in the streaming API. We would need to adjust the following:
> > > >
> > > > 1) Do not copy between operators (I think this is the case right now)
> > > >
> > > > 2) Copy before putting it into a window buffer and any other group
> > > buffer.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Yes, in fact I anticipated this. There is one central place where
> we
> > > > > can insert a copy step, in OperatorCollector in OutputHandler.
> > > > >
> > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se>
> > wrote:
> > > > > > I guess it was not intended ^^.
> > > > > >
> > > > > > Chaining should be transparent and not break the correct/expected
> > > > > behaviour.
> > > > > >
> > > > > >
> > > > > > Paris?
> > > > > >
> > > > > > On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org>
> > > wrote:
> > > > > >
> > > > > > +1 for copying.
> > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org>
> wrote:
> > > > > >
> > > > > > Hey,
> > > > > >
> > > > > > The latest streaming operator rework removed the copying of the
> > > outputs
> > > > > > before passing them to chained operators. This is a major break
> for
> > > the
> > > > > > previous operator semantics which guaranteed immutability.
> > > > > >
> > > > > > I think this change leads to very indeterministic program
> behaviour
> > > > from
> > > > > > the user's perspective as only non-chained outputs/inputs will be
> > > > > mutable.
> > > > > > If we allow this to happen, users will start disabling chaining
> to
> > > get
> > > > > > immutability which defeats the purpose. (chaining should not
> affect
> > > > > program
> > > > > > behaviour just increase performance)
> > > > > >
> > > > > > In my opinion the default setting for each operator should be
> > > > > immutability
> > > > > > and the user could override this manually if he/she wants.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Regards,
> > > > > > Gyula
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Stephan Ewen <se...@apache.org>.
The case you are making is if a preceding operator in a chain is repeatedly
emitting the same object, and the succeeding operator is gathering the
objects, then it is a problem

Or are there cases where the system itself repeatedly emits the same
objects?

On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gy...@apache.org> wrote:

> We are designing a system for stateful stream computations, assuming long
> standing operators that gather and store data as the stream evolves (unlike
> in the dataset api). Many programs, like windowing, sampling etc hold the
> state in the form of past data. And without careful understanding of the
> runtime these programs will break or have unnecessary copies.
>
> This is why I think immutability should be the default so we can have a
> clear dataflow model with immutable streams.
>
> I see absolutely no reason why we cant have the non-copy version as an
> optional setting for the users.
>
>
> On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se> wrote:
>
> > @stephan I see your point. If we assume that operators do not hold
> > references in their state to any transmitted records it works fine. We
> > therefore need to make this clear to the users. I need to check if that
> > would break semantics in SAMOA or other integrations as well that assume
> > immutability. For example in SAMOA there are often local metric objects
> > that are being constantly mutated and simply forwarded periodically to
> > other (possibly chained) operators that need to evaluate them.
> >
> > ________________________________________
> > From: Gyula Fóra <gy...@apache.org>
> > Sent: Wednesday, May 20, 2015 2:06 PM
> > To: dev@flink.apache.org
> > Subject: Re: [DISCUSS] Re-add record copy to chained operator calls
> >
> > "Copy before putting it into a window buffer and any other group buffer."
> >
> > Exactly my point. Any stateful operator should be able to implement
> > something like this without having to worry about copying the object (and
> > at this point the user would need to know whether it comes from the
> network
> > to avoid unnecessary copies), so I don't agree with leaving the copy off.
> >
> > The user can of course specify that the operator is mutable if he wants
> > (and he is worried about the performance), But I still think the default
> > behaviour should be immutable.
> > We cannot force users to not hold object references and also it is a
> quite
> > unnatural way of programming in a language like java.
> >
> >
> > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > I am curious why the copying is actually needed.
> > >
> > > In the batch API, we chain and do not copy and it is rather
> predictable.
> > >
> > > The cornerpoints of that design is to follow these rules:
> > >
> > >  1) Objects read from the network or any buffer are always new objects.
> > > That comes naturally when they are deserialized as part of that (all
> > > buffers store serialized)
> > >
> > >  2) After a function returned a record (or gives one to the collector),
> > it
> > > if given to the chain of chained operators, but after it is through the
> > > chain, no one else holds a reference to that object.
> > >      For that, it is crucial that objects are not stored by reference,
> > but
> > > either stored serialized, or a copy is stored.
> > >
> > > This is quite solid in the batch API. How about we follow the same
> > paradigm
> > > in the streaming API. We would need to adjust the following:
> > >
> > > 1) Do not copy between operators (I think this is the case right now)
> > >
> > > 2) Copy before putting it into a window buffer and any other group
> > buffer.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > Yes, in fact I anticipated this. There is one central place where we
> > > > can insert a copy step, in OperatorCollector in OutputHandler.
> > > >
> > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se>
> wrote:
> > > > > I guess it was not intended ^^.
> > > > >
> > > > > Chaining should be transparent and not break the correct/expected
> > > > behaviour.
> > > > >
> > > > >
> > > > > Paris?
> > > > >
> > > > > On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org>
> > wrote:
> > > > >
> > > > > +1 for copying.
> > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:
> > > > >
> > > > > Hey,
> > > > >
> > > > > The latest streaming operator rework removed the copying of the
> > outputs
> > > > > before passing them to chained operators. This is a major break for
> > the
> > > > > previous operator semantics which guaranteed immutability.
> > > > >
> > > > > I think this change leads to very indeterministic program behaviour
> > > from
> > > > > the user's perspective as only non-chained outputs/inputs will be
> > > > mutable.
> > > > > If we allow this to happen, users will start disabling chaining to
> > get
> > > > > immutability which defeats the purpose. (chaining should not affect
> > > > program
> > > > > behaviour just increase performance)
> > > > >
> > > > > In my opinion the default setting for each operator should be
> > > > immutability
> > > > > and the user could override this manually if he/she wants.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Regards,
> > > > > Gyula
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Gyula Fóra <gy...@apache.org>.
We are designing a system for stateful stream computations, assuming long
standing operators that gather and store data as the stream evolves (unlike
in the dataset api). Many programs, like windowing, sampling etc hold the
state in the form of past data. And without careful understanding of the
runtime these programs will break or have unnecessary copies.

This is why I think immutability should be the default so we can have a
clear dataflow model with immutable streams.

I see absolutely no reason why we cant have the non-copy version as an
optional setting for the users.


On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <pa...@kth.se> wrote:

> @stephan I see your point. If we assume that operators do not hold
> references in their state to any transmitted records it works fine. We
> therefore need to make this clear to the users. I need to check if that
> would break semantics in SAMOA or other integrations as well that assume
> immutability. For example in SAMOA there are often local metric objects
> that are being constantly mutated and simply forwarded periodically to
> other (possibly chained) operators that need to evaluate them.
>
> ________________________________________
> From: Gyula Fóra <gy...@apache.org>
> Sent: Wednesday, May 20, 2015 2:06 PM
> To: dev@flink.apache.org
> Subject: Re: [DISCUSS] Re-add record copy to chained operator calls
>
> "Copy before putting it into a window buffer and any other group buffer."
>
> Exactly my point. Any stateful operator should be able to implement
> something like this without having to worry about copying the object (and
> at this point the user would need to know whether it comes from the network
> to avoid unnecessary copies), so I don't agree with leaving the copy off.
>
> The user can of course specify that the operator is mutable if he wants
> (and he is worried about the performance), But I still think the default
> behaviour should be immutable.
> We cannot force users to not hold object references and also it is a quite
> unnatural way of programming in a language like java.
>
>
> On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > I am curious why the copying is actually needed.
> >
> > In the batch API, we chain and do not copy and it is rather predictable.
> >
> > The cornerpoints of that design is to follow these rules:
> >
> >  1) Objects read from the network or any buffer are always new objects.
> > That comes naturally when they are deserialized as part of that (all
> > buffers store serialized)
> >
> >  2) After a function returned a record (or gives one to the collector),
> it
> > if given to the chain of chained operators, but after it is through the
> > chain, no one else holds a reference to that object.
> >      For that, it is crucial that objects are not stored by reference,
> but
> > either stored serialized, or a copy is stored.
> >
> > This is quite solid in the batch API. How about we follow the same
> paradigm
> > in the streaming API. We would need to adjust the following:
> >
> > 1) Do not copy between operators (I think this is the case right now)
> >
> > 2) Copy before putting it into a window buffer and any other group
> buffer.
> >
> >
> >
> >
> >
> >
> >
> >
> > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Yes, in fact I anticipated this. There is one central place where we
> > > can insert a copy step, in OperatorCollector in OutputHandler.
> > >
> > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se> wrote:
> > > > I guess it was not intended ^^.
> > > >
> > > > Chaining should be transparent and not break the correct/expected
> > > behaviour.
> > > >
> > > >
> > > > Paris?
> > > >
> > > > On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org>
> wrote:
> > > >
> > > > +1 for copying.
> > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:
> > > >
> > > > Hey,
> > > >
> > > > The latest streaming operator rework removed the copying of the
> outputs
> > > > before passing them to chained operators. This is a major break for
> the
> > > > previous operator semantics which guaranteed immutability.
> > > >
> > > > I think this change leads to very indeterministic program behaviour
> > from
> > > > the user's perspective as only non-chained outputs/inputs will be
> > > mutable.
> > > > If we allow this to happen, users will start disabling chaining to
> get
> > > > immutability which defeats the purpose. (chaining should not affect
> > > program
> > > > behaviour just increase performance)
> > > >
> > > > In my opinion the default setting for each operator should be
> > > immutability
> > > > and the user could override this manually if he/she wants.
> > > >
> > > > What do you think?
> > > >
> > > > Regards,
> > > > Gyula
> > > >
> > > >
> > >
> >
>

RE: [DISCUSS] Re-add record copy to chained operator calls

Posted by Paris Carbone <pa...@kth.se>.
@stephan I see your point. If we assume that operators do not hold references in their state to any transmitted records it works fine. We therefore need to make this clear to the users. I need to check if that would break semantics in SAMOA or other integrations as well that assume immutability. For example in SAMOA there are often local metric objects that are being constantly mutated and simply forwarded periodically to other (possibly chained) operators that need to evaluate them. 

________________________________________
From: Gyula Fóra <gy...@apache.org>
Sent: Wednesday, May 20, 2015 2:06 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Re-add record copy to chained operator calls

"Copy before putting it into a window buffer and any other group buffer."

Exactly my point. Any stateful operator should be able to implement
something like this without having to worry about copying the object (and
at this point the user would need to know whether it comes from the network
to avoid unnecessary copies), so I don't agree with leaving the copy off.

The user can of course specify that the operator is mutable if he wants
(and he is worried about the performance), But I still think the default
behaviour should be immutable.
We cannot force users to not hold object references and also it is a quite
unnatural way of programming in a language like java.


On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org> wrote:

> I am curious why the copying is actually needed.
>
> In the batch API, we chain and do not copy and it is rather predictable.
>
> The cornerpoints of that design is to follow these rules:
>
>  1) Objects read from the network or any buffer are always new objects.
> That comes naturally when they are deserialized as part of that (all
> buffers store serialized)
>
>  2) After a function returned a record (or gives one to the collector), it
> if given to the chain of chained operators, but after it is through the
> chain, no one else holds a reference to that object.
>      For that, it is crucial that objects are not stored by reference, but
> either stored serialized, or a copy is stored.
>
> This is quite solid in the batch API. How about we follow the same paradigm
> in the streaming API. We would need to adjust the following:
>
> 1) Do not copy between operators (I think this is the case right now)
>
> 2) Copy before putting it into a window buffer and any other group buffer.
>
>
>
>
>
>
>
>
> On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Yes, in fact I anticipated this. There is one central place where we
> > can insert a copy step, in OperatorCollector in OutputHandler.
> >
> > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se> wrote:
> > > I guess it was not intended ^^.
> > >
> > > Chaining should be transparent and not break the correct/expected
> > behaviour.
> > >
> > >
> > > Paris?
> > >
> > > On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org> wrote:
> > >
> > > +1 for copying.
> > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:
> > >
> > > Hey,
> > >
> > > The latest streaming operator rework removed the copying of the outputs
> > > before passing them to chained operators. This is a major break for the
> > > previous operator semantics which guaranteed immutability.
> > >
> > > I think this change leads to very indeterministic program behaviour
> from
> > > the user's perspective as only non-chained outputs/inputs will be
> > mutable.
> > > If we allow this to happen, users will start disabling chaining to get
> > > immutability which defeats the purpose. (chaining should not affect
> > program
> > > behaviour just increase performance)
> > >
> > > In my opinion the default setting for each operator should be
> > immutability
> > > and the user could override this manually if he/she wants.
> > >
> > > What do you think?
> > >
> > > Regards,
> > > Gyula
> > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Gyula Fóra <gy...@apache.org>.
"Copy before putting it into a window buffer and any other group buffer."

Exactly my point. Any stateful operator should be able to implement
something like this without having to worry about copying the object (and
at this point the user would need to know whether it comes from the network
to avoid unnecessary copies), so I don't agree with leaving the copy off.

The user can of course specify that the operator is mutable if he wants
(and he is worried about the performance), But I still think the default
behaviour should be immutable.
We cannot force users to not hold object references and also it is a quite
unnatural way of programming in a language like java.


On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org> wrote:

> I am curious why the copying is actually needed.
>
> In the batch API, we chain and do not copy and it is rather predictable.
>
> The cornerpoints of that design is to follow these rules:
>
>  1) Objects read from the network or any buffer are always new objects.
> That comes naturally when they are deserialized as part of that (all
> buffers store serialized)
>
>  2) After a function returned a record (or gives one to the collector), it
> if given to the chain of chained operators, but after it is through the
> chain, no one else holds a reference to that object.
>      For that, it is crucial that objects are not stored by reference, but
> either stored serialized, or a copy is stored.
>
> This is quite solid in the batch API. How about we follow the same paradigm
> in the streaming API. We would need to adjust the following:
>
> 1) Do not copy between operators (I think this is the case right now)
>
> 2) Copy before putting it into a window buffer and any other group buffer.
>
>
>
>
>
>
>
>
> On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Yes, in fact I anticipated this. There is one central place where we
> > can insert a copy step, in OperatorCollector in OutputHandler.
> >
> > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se> wrote:
> > > I guess it was not intended ^^.
> > >
> > > Chaining should be transparent and not break the correct/expected
> > behaviour.
> > >
> > >
> > > Paris?
> > >
> > > On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org> wrote:
> > >
> > > +1 for copying.
> > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:
> > >
> > > Hey,
> > >
> > > The latest streaming operator rework removed the copying of the outputs
> > > before passing them to chained operators. This is a major break for the
> > > previous operator semantics which guaranteed immutability.
> > >
> > > I think this change leads to very indeterministic program behaviour
> from
> > > the user's perspective as only non-chained outputs/inputs will be
> > mutable.
> > > If we allow this to happen, users will start disabling chaining to get
> > > immutability which defeats the purpose. (chaining should not affect
> > program
> > > behaviour just increase performance)
> > >
> > > In my opinion the default setting for each operator should be
> > immutability
> > > and the user could override this manually if he/she wants.
> > >
> > > What do you think?
> > >
> > > Regards,
> > > Gyula
> > >
> > >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Stephan Ewen <se...@apache.org>.
I am curious why the copying is actually needed.

In the batch API, we chain and do not copy and it is rather predictable.

The cornerpoints of that design is to follow these rules:

 1) Objects read from the network or any buffer are always new objects.
That comes naturally when they are deserialized as part of that (all
buffers store serialized)

 2) After a function returned a record (or gives one to the collector), it
if given to the chain of chained operators, but after it is through the
chain, no one else holds a reference to that object.
     For that, it is crucial that objects are not stored by reference, but
either stored serialized, or a copy is stored.

This is quite solid in the batch API. How about we follow the same paradigm
in the streaming API. We would need to adjust the following:

1) Do not copy between operators (I think this is the case right now)

2) Copy before putting it into a window buffer and any other group buffer.








On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Yes, in fact I anticipated this. There is one central place where we
> can insert a copy step, in OperatorCollector in OutputHandler.
>
> On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se> wrote:
> > I guess it was not intended ^^.
> >
> > Chaining should be transparent and not break the correct/expected
> behaviour.
> >
> >
> > Paris?
> >
> > On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org> wrote:
> >
> > +1 for copying.
> > On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:
> >
> > Hey,
> >
> > The latest streaming operator rework removed the copying of the outputs
> > before passing them to chained operators. This is a major break for the
> > previous operator semantics which guaranteed immutability.
> >
> > I think this change leads to very indeterministic program behaviour from
> > the user's perspective as only non-chained outputs/inputs will be
> mutable.
> > If we allow this to happen, users will start disabling chaining to get
> > immutability which defeats the purpose. (chaining should not affect
> program
> > behaviour just increase performance)
> >
> > In my opinion the default setting for each operator should be
> immutability
> > and the user could override this manually if he/she wants.
> >
> > What do you think?
> >
> > Regards,
> > Gyula
> >
> >
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, in fact I anticipated this. There is one central place where we
can insert a copy step, in OperatorCollector in OutputHandler.

On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <pa...@kth.se> wrote:
> I guess it was not intended ^^.
>
> Chaining should be transparent and not break the correct/expected behaviour.
>
>
> Paris?
>
> On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org> wrote:
>
> +1 for copying.
> On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:
>
> Hey,
>
> The latest streaming operator rework removed the copying of the outputs
> before passing them to chained operators. This is a major break for the
> previous operator semantics which guaranteed immutability.
>
> I think this change leads to very indeterministic program behaviour from
> the user's perspective as only non-chained outputs/inputs will be mutable.
> If we allow this to happen, users will start disabling chaining to get
> immutability which defeats the purpose. (chaining should not affect program
> behaviour just increase performance)
>
> In my opinion the default setting for each operator should be immutability
> and the user could override this manually if he/she wants.
>
> What do you think?
>
> Regards,
> Gyula
>
>

Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Paris Carbone <pa...@kth.se>.
I guess it was not intended ^^.

Chaining should be transparent and not break the correct/expected behaviour.


Paris?

On 20 May 2015, at 11:02, Márton Balassi <mb...@apache.org> wrote:

+1 for copying.
On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:

Hey,

The latest streaming operator rework removed the copying of the outputs
before passing them to chained operators. This is a major break for the
previous operator semantics which guaranteed immutability.

I think this change leads to very indeterministic program behaviour from
the user's perspective as only non-chained outputs/inputs will be mutable.
If we allow this to happen, users will start disabling chaining to get
immutability which defeats the purpose. (chaining should not affect program
behaviour just increase performance)

In my opinion the default setting for each operator should be immutability
and the user could override this manually if he/she wants.

What do you think?

Regards,
Gyula



Re: [DISCUSS] Re-add record copy to chained operator calls

Posted by Márton Balassi <mb...@apache.org>.
+1 for copying.
On May 20, 2015 10:50 AM, "Gyula Fóra" <gy...@apache.org> wrote:

> Hey,
>
> The latest streaming operator rework removed the copying of the outputs
> before passing them to chained operators. This is a major break for the
> previous operator semantics which guaranteed immutability.
>
> I think this change leads to very indeterministic program behaviour from
> the user's perspective as only non-chained outputs/inputs will be mutable.
> If we allow this to happen, users will start disabling chaining to get
> immutability which defeats the purpose. (chaining should not affect program
> behaviour just increase performance)
>
> In my opinion the default setting for each operator should be immutability
> and the user could override this manually if he/she wants.
>
> What do you think?
>
> Regards,
> Gyula
>