You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2016/11/21 10:30:03 UTC

[DISCUSS] Hold copies in HeapStateBackend

Hi everybody,

when implementing a ReduceFunction for incremental aggregation of SQL /
Table API window aggregates we noticed that the HeapStateBackend does not
store copies but holds references to the original objects. In case of a
SlidingWindow, the same object is referenced from different window panes.
Therefore, it is not possible to modify these objects (in order to avoid
object instantiations, see discussion [1]).

Other state backends serialize their data such that the behavior is not
consistent across backends.
If we want to have light-weight tests, we have to create new objects in the
ReduceFunction causing unnecessary overhead.

I would propose to copy objects when storing them in a HeapStateBackend.
This would ensure that objects returned from state to the user behave
identical for different state backends.

We created a related JIRA [2] that asks to copy records that go into an
incremental ReduceFunction. The scope is more narrow and would solve our
problem, but would leave the inconsistent behavior of state backends in
place.

What do others think?

Cheers, Fabian

[1] https://github.com/apache/flink/pull/2792#discussion_r88653721
[2] https://issues.apache.org/jira/browse/FLINK-5105

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Greg,

yes we could and as Aljoscha pointed out the proper solution would be to
serialize all objects (as done in the DataSet API) and not hold them as
objects on the heap.
This would be a major effort though and I am rather looking for a 'quick'
work around that does not have major side effects.

2016-11-23 17:07 GMT+01:00 Greg Hogan <co...@greghogan.com>:

> On Wed, Nov 23, 2016 at 8:34 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > The ReduceFunction must be used in the right way and it is easy to get
> > wrong.
> >
>
> I'm likely highlighting my ignorance here, but if object reuse works
> properly for ReduceFunction in the batch API, can we do the same in the
> streaming API?
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Greg Hogan <co...@greghogan.com>.
On Wed, Nov 23, 2016 at 8:34 AM, Fabian Hueske <fh...@gmail.com> wrote:

> The ReduceFunction must be used in the right way and it is easy to get
> wrong.
>

I'm likely highlighting my ignorance here, but if object reuse works
properly for ReduceFunction in the batch API, can we do the same in the
streaming API?

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Fabian Hueske <fh...@gmail.com>.
Yes, you're right. This is not a principled solution but rather a
work-around for a specific use case.

The ReduceFunction must be used in the right way and it is easy to get
wrong. (OTOH, there is currently no way to get object reusage right. So
think the change would not worsen the current state.)
That's why I would not expose this to users.
However, internal functions could rely on this if it is backed by tests,
IMO.

Btw. the HeapFoldingState works with a fresh copy of the default value.
So in some sense, the change would make the behavior of incremental updates
more consistent ;-)

I'll open a JIRA to improve the documentation.

2016-11-23 14:23 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> You can go ahead and do the change. I just think that this is quite
> fragile. For example, this depends on the reduce function returning the
> right object for reuse. If we hand in the copied object as the first input
> and the ReduceFunction reuses the second input then we again have a
> reference to the same (input) object in the state. Several components have
> to work together to make this intricate dance work and if someone changes
> the order in the reduce function for some reason this might break.
>
> Big +1 on documenting the shortcomings. :-)
>
> On Wed, 23 Nov 2016 at 12:34 Fabian Hueske <fh...@gmail.com> wrote:
>
> > Hi Aljoscha,
> >
> > sure, there many issues with holding the state as objects on the heap.
> > However, I think we don't have to solve all problems related to that in
> > order to add a small fix that solves one specific issue.
> > I would not explicitly expose the fix to users but it would be nice if we
> > could implement more efficient code for internal functions.
> >
> > Moreover, I think we should extend the documentation and clearly point
> out
> > the limitations regarding modifying state objects.
> >
> > Best, Fabian
> >
> >
> >
> > 2016-11-23 12:07 GMT+01:00 sjk <sh...@163.com>:
> >
> > > hi,Fabian Hueske, Sorry for mistake for the whole PR #2792
> > >
> > > > On Nov 23, 2016, at 17:10, Fabian Hueske <fh...@gmail.com> wrote:
> > > >
> > > > Hi,
> > > >
> > > > Why do you think that this means "much code changes"?
> > > > I think it would actually be a pretty lightweight change in
> > > > HeapReducingState.
> > > >
> > > > The proposal is to copy the *first* value that goes into a
> > ReducingState.
> > > > The copy would be done by a TypeSerializer and hence be a deep copy.
> > > > This will allow to reuse the copy in each invocation of the
> > > ReduceFunction
> > > > instead of creating a new result object of the same type that was
> > > initially
> > > > copied.
> > > >
> > > > I think the savings of reusing the object in each invocation of the
> > > > ReduceFunction and not creating a new object should amortize the
> > one-time
> > > > object copy.
> > > >
> > > > Fabian
> > > >
> > > > 2016-11-23 3:04 GMT+01:00 sjk <sh...@163.com>:
> > > >
> > > >> Hi, Fabian
> > > >>
> > > >> So much code changes. Can you show us the key changes code for the
> > > object
> > > >> copy?
> > > >> Object reference maybe hold more deep reference, it can be a bomb.
> > > >> Can we renew a object with its data or direct use kryo for object
> > > >> serialization?
> > > >> I’m not prefer object copy.
> > > >>
> > > >>
> > > >>> On Nov 22, 2016, at 20:33, Fabian Hueske <fh...@gmail.com>
> wrote:
> > > >>>
> > > >>> Does anybody have objections against copying the first record that
> > goes
> > > >>> into the ReduceState?
> > > >>>
> > > >>> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> > > >>>
> > > >>>> That's right, yes.
> > > >>>>
> > > >>>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com>
> > wrote:
> > > >>>>
> > > >>>>> Right, but that would be a much bigger change than "just" copying
> > the
> > > >>>>> *first* record that goes into the ReduceState, or am I missing
> > > >> something?
> > > >>>>>
> > > >>>>>
> > > >>>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org
> >:
> > > >>>>>
> > > >>>>>> To bring over my comment from the Github PR that started this
> > > >>>> discussion:
> > > >>>>>>
> > > >>>>>> @wuchong <https://github.com/wuchong>, yes this is a problem
> with
> > > the
> > > >>>>>> HeapStateBackend. The RocksDB backend does not suffer from this
> > > >>>> problem.
> > > >>>>> I
> > > >>>>>> think in the long run we should migrate the HeapStateBackend to
> > > always
> > > >>>>> keep
> > > >>>>>> data in serialised form, then we also won't have this problem
> > > anymore.
> > > >>>>>>
> > > >>>>>> So I'm very much in favour of keeping data serialised. Copying
> > data
> > > >>>> would
> > > >>>>>> only ever be a stopgap solution.
> > > >>>>>>
> > > >>>>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com>
> > > wrote:
> > > >>>>>>
> > > >>>>>>> Another approach that would solve the problem for our use case
> > > >>>> (object
> > > >>>>>>> re-usage for incremental window ReduceFunctions) would be to
> copy
> > > the
> > > >>>>>> first
> > > >>>>>>> object that is put into the state.
> > > >>>>>>> This would be a change on the ReduceState, not on the overall
> > state
> > > >>>>>>> backend, which should be feasible, no?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > > >>>>>>>
> > > >>>>>>>> -1 for copying objects.
> > > >>>>>>>>
> > > >>>>>>>> Storing a serialized data where possible is good, but copying
> > all
> > > >>>>>> objects
> > > >>>>>>>> by default is not a good idea, in my opinion.
> > > >>>>>>>> A lot of scenarios use data types that are hellishly expensive
> > to
> > > >>>>> copy.
> > > >>>>>>>> Even the current copy on chain handover is a problem.
> > > >>>>>>>>
> > > >>>>>>>> Let's not introduce even more copies.
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <
> mpr@touk.pl>
> > > >>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi,
> > > >>>>>>>>>
> > > >>>>>>>>> it will come with performance overhead when updating the
> state,
> > > >>>>> but I
> > > >>>>>>>>> think it'll be possible to perform asynchronous snapshots
> using
> > > >>>>>>>>> HeapStateBackend (probably some changes to underlying data
> > > >>>>> structures
> > > >>>>>>>> would
> > > >>>>>>>>> be needed) - which would bring more predictable performance.
> > > >>>>>>>>>
> > > >>>>>>>>> thanks,
> > > >>>>>>>>> maciek
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi,
> > > >>>>>>>>>> I would be in favour of this since it brings things in line
> > with
> > > >>>>> the
> > > >>>>>>>>>> RocksDB backend. This will, however, come with quite the
> > > >>>>> performance
> > > >>>>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Cheers,
> > > >>>>>>>>>> Aljoscha
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <
> fhueske@gmail.com
> > >
> > > >>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hi everybody,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> when implementing a ReduceFunction for incremental
> > aggregation
> > > >>>> of
> > > >>>>>>> SQL /
> > > >>>>>>>>>>> Table API window aggregates we noticed that the
> > > >>>> HeapStateBackend
> > > >>>>>> does
> > > >>>>>>>> not
> > > >>>>>>>>>>> store copies but holds references to the original objects.
> In
> > > >>>>> case
> > > >>>>>>> of a
> > > >>>>>>>>>>> SlidingWindow, the same object is referenced from different
> > > >>>>> window
> > > >>>>>>>> panes.
> > > >>>>>>>>>>> Therefore, it is not possible to modify these objects (in
> > order
> > > >>>>> to
> > > >>>>>>>> avoid
> > > >>>>>>>>>>> object instantiations, see discussion [1]).
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Other state backends serialize their data such that the
> > > >>>> behavior
> > > >>>>> is
> > > >>>>>>> not
> > > >>>>>>>>>>> consistent across backends.
> > > >>>>>>>>>>> If we want to have light-weight tests, we have to create
> new
> > > >>>>>> objects
> > > >>>>>>> in
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>> ReduceFunction causing unnecessary overhead.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I would propose to copy objects when storing them in a
> > > >>>>>>>> HeapStateBackend.
> > > >>>>>>>>>>> This would ensure that objects returned from state to the
> > user
> > > >>>>>> behave
> > > >>>>>>>>>>> identical for different state backends.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> We created a related JIRA [2] that asks to copy records
> that
> > go
> > > >>>>>> into
> > > >>>>>>> an
> > > >>>>>>>>>>> incremental ReduceFunction. The scope is more narrow and
> > would
> > > >>>>>> solve
> > > >>>>>>>> our
> > > >>>>>>>>>>> problem, but would leave the inconsistent behavior of state
> > > >>>>>> backends
> > > >>>>>>> in
> > > >>>>>>>>>>> place.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> What do others think?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Cheers, Fabian
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> [1]
> > > >>>>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> > > >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > > >>
> > > >>
> > >
> > >
> > >
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Aljoscha Krettek <al...@apache.org>.
You can go ahead and do the change. I just think that this is quite
fragile. For example, this depends on the reduce function returning the
right object for reuse. If we hand in the copied object as the first input
and the ReduceFunction reuses the second input then we again have a
reference to the same (input) object in the state. Several components have
to work together to make this intricate dance work and if someone changes
the order in the reduce function for some reason this might break.

Big +1 on documenting the shortcomings. :-)

On Wed, 23 Nov 2016 at 12:34 Fabian Hueske <fh...@gmail.com> wrote:

> Hi Aljoscha,
>
> sure, there many issues with holding the state as objects on the heap.
> However, I think we don't have to solve all problems related to that in
> order to add a small fix that solves one specific issue.
> I would not explicitly expose the fix to users but it would be nice if we
> could implement more efficient code for internal functions.
>
> Moreover, I think we should extend the documentation and clearly point out
> the limitations regarding modifying state objects.
>
> Best, Fabian
>
>
>
> 2016-11-23 12:07 GMT+01:00 sjk <sh...@163.com>:
>
> > hi,Fabian Hueske, Sorry for mistake for the whole PR #2792
> >
> > > On Nov 23, 2016, at 17:10, Fabian Hueske <fh...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > Why do you think that this means "much code changes"?
> > > I think it would actually be a pretty lightweight change in
> > > HeapReducingState.
> > >
> > > The proposal is to copy the *first* value that goes into a
> ReducingState.
> > > The copy would be done by a TypeSerializer and hence be a deep copy.
> > > This will allow to reuse the copy in each invocation of the
> > ReduceFunction
> > > instead of creating a new result object of the same type that was
> > initially
> > > copied.
> > >
> > > I think the savings of reusing the object in each invocation of the
> > > ReduceFunction and not creating a new object should amortize the
> one-time
> > > object copy.
> > >
> > > Fabian
> > >
> > > 2016-11-23 3:04 GMT+01:00 sjk <sh...@163.com>:
> > >
> > >> Hi, Fabian
> > >>
> > >> So much code changes. Can you show us the key changes code for the
> > object
> > >> copy?
> > >> Object reference maybe hold more deep reference, it can be a bomb.
> > >> Can we renew a object with its data or direct use kryo for object
> > >> serialization?
> > >> I’m not prefer object copy.
> > >>
> > >>
> > >>> On Nov 22, 2016, at 20:33, Fabian Hueske <fh...@gmail.com> wrote:
> > >>>
> > >>> Does anybody have objections against copying the first record that
> goes
> > >>> into the ReduceState?
> > >>>
> > >>> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> > >>>
> > >>>> That's right, yes.
> > >>>>
> > >>>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com>
> wrote:
> > >>>>
> > >>>>> Right, but that would be a much bigger change than "just" copying
> the
> > >>>>> *first* record that goes into the ReduceState, or am I missing
> > >> something?
> > >>>>>
> > >>>>>
> > >>>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> > >>>>>
> > >>>>>> To bring over my comment from the Github PR that started this
> > >>>> discussion:
> > >>>>>>
> > >>>>>> @wuchong <https://github.com/wuchong>, yes this is a problem with
> > the
> > >>>>>> HeapStateBackend. The RocksDB backend does not suffer from this
> > >>>> problem.
> > >>>>> I
> > >>>>>> think in the long run we should migrate the HeapStateBackend to
> > always
> > >>>>> keep
> > >>>>>> data in serialised form, then we also won't have this problem
> > anymore.
> > >>>>>>
> > >>>>>> So I'm very much in favour of keeping data serialised. Copying
> data
> > >>>> would
> > >>>>>> only ever be a stopgap solution.
> > >>>>>>
> > >>>>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com>
> > wrote:
> > >>>>>>
> > >>>>>>> Another approach that would solve the problem for our use case
> > >>>> (object
> > >>>>>>> re-usage for incremental window ReduceFunctions) would be to copy
> > the
> > >>>>>> first
> > >>>>>>> object that is put into the state.
> > >>>>>>> This would be a change on the ReduceState, not on the overall
> state
> > >>>>>>> backend, which should be feasible, no?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > >>>>>>>
> > >>>>>>>> -1 for copying objects.
> > >>>>>>>>
> > >>>>>>>> Storing a serialized data where possible is good, but copying
> all
> > >>>>>> objects
> > >>>>>>>> by default is not a good idea, in my opinion.
> > >>>>>>>> A lot of scenarios use data types that are hellishly expensive
> to
> > >>>>> copy.
> > >>>>>>>> Even the current copy on chain handover is a problem.
> > >>>>>>>>
> > >>>>>>>> Let's not introduce even more copies.
> > >>>>>>>>
> > >>>>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> it will come with performance overhead when updating the state,
> > >>>>> but I
> > >>>>>>>>> think it'll be possible to perform asynchronous snapshots using
> > >>>>>>>>> HeapStateBackend (probably some changes to underlying data
> > >>>>> structures
> > >>>>>>>> would
> > >>>>>>>>> be needed) - which would bring more predictable performance.
> > >>>>>>>>>
> > >>>>>>>>> thanks,
> > >>>>>>>>> maciek
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi,
> > >>>>>>>>>> I would be in favour of this since it brings things in line
> with
> > >>>>> the
> > >>>>>>>>>> RocksDB backend. This will, however, come with quite the
> > >>>>> performance
> > >>>>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Aljoscha
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhueske@gmail.com
> >
> > >>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> Hi everybody,
> > >>>>>>>>>>>
> > >>>>>>>>>>> when implementing a ReduceFunction for incremental
> aggregation
> > >>>> of
> > >>>>>>> SQL /
> > >>>>>>>>>>> Table API window aggregates we noticed that the
> > >>>> HeapStateBackend
> > >>>>>> does
> > >>>>>>>> not
> > >>>>>>>>>>> store copies but holds references to the original objects. In
> > >>>>> case
> > >>>>>>> of a
> > >>>>>>>>>>> SlidingWindow, the same object is referenced from different
> > >>>>> window
> > >>>>>>>> panes.
> > >>>>>>>>>>> Therefore, it is not possible to modify these objects (in
> order
> > >>>>> to
> > >>>>>>>> avoid
> > >>>>>>>>>>> object instantiations, see discussion [1]).
> > >>>>>>>>>>>
> > >>>>>>>>>>> Other state backends serialize their data such that the
> > >>>> behavior
> > >>>>> is
> > >>>>>>> not
> > >>>>>>>>>>> consistent across backends.
> > >>>>>>>>>>> If we want to have light-weight tests, we have to create new
> > >>>>>> objects
> > >>>>>>> in
> > >>>>>>>>>>> the
> > >>>>>>>>>>> ReduceFunction causing unnecessary overhead.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I would propose to copy objects when storing them in a
> > >>>>>>>> HeapStateBackend.
> > >>>>>>>>>>> This would ensure that objects returned from state to the
> user
> > >>>>>> behave
> > >>>>>>>>>>> identical for different state backends.
> > >>>>>>>>>>>
> > >>>>>>>>>>> We created a related JIRA [2] that asks to copy records that
> go
> > >>>>>> into
> > >>>>>>> an
> > >>>>>>>>>>> incremental ReduceFunction. The scope is more narrow and
> would
> > >>>>>> solve
> > >>>>>>>> our
> > >>>>>>>>>>> problem, but would leave the inconsistent behavior of state
> > >>>>>> backends
> > >>>>>>> in
> > >>>>>>>>>>> place.
> > >>>>>>>>>>>
> > >>>>>>>>>>> What do others think?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers, Fabian
> > >>>>>>>>>>>
> > >>>>>>>>>>> [1]
> > >>>>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> > >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> > >>
> > >>
> >
> >
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Aljoscha,

sure, there many issues with holding the state as objects on the heap.
However, I think we don't have to solve all problems related to that in
order to add a small fix that solves one specific issue.
I would not explicitly expose the fix to users but it would be nice if we
could implement more efficient code for internal functions.

Moreover, I think we should extend the documentation and clearly point out
the limitations regarding modifying state objects.

Best, Fabian



2016-11-23 12:07 GMT+01:00 sjk <sh...@163.com>:

> hi,Fabian Hueske, Sorry for mistake for the whole PR #2792
>
> > On Nov 23, 2016, at 17:10, Fabian Hueske <fh...@gmail.com> wrote:
> >
> > Hi,
> >
> > Why do you think that this means "much code changes"?
> > I think it would actually be a pretty lightweight change in
> > HeapReducingState.
> >
> > The proposal is to copy the *first* value that goes into a ReducingState.
> > The copy would be done by a TypeSerializer and hence be a deep copy.
> > This will allow to reuse the copy in each invocation of the
> ReduceFunction
> > instead of creating a new result object of the same type that was
> initially
> > copied.
> >
> > I think the savings of reusing the object in each invocation of the
> > ReduceFunction and not creating a new object should amortize the one-time
> > object copy.
> >
> > Fabian
> >
> > 2016-11-23 3:04 GMT+01:00 sjk <sh...@163.com>:
> >
> >> Hi, Fabian
> >>
> >> So much code changes. Can you show us the key changes code for the
> object
> >> copy?
> >> Object reference maybe hold more deep reference, it can be a bomb.
> >> Can we renew a object with its data or direct use kryo for object
> >> serialization?
> >> I’m not prefer object copy.
> >>
> >>
> >>> On Nov 22, 2016, at 20:33, Fabian Hueske <fh...@gmail.com> wrote:
> >>>
> >>> Does anybody have objections against copying the first record that goes
> >>> into the ReduceState?
> >>>
> >>> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >>>
> >>>> That's right, yes.
> >>>>
> >>>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com> wrote:
> >>>>
> >>>>> Right, but that would be a much bigger change than "just" copying the
> >>>>> *first* record that goes into the ReduceState, or am I missing
> >> something?
> >>>>>
> >>>>>
> >>>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >>>>>
> >>>>>> To bring over my comment from the Github PR that started this
> >>>> discussion:
> >>>>>>
> >>>>>> @wuchong <https://github.com/wuchong>, yes this is a problem with
> the
> >>>>>> HeapStateBackend. The RocksDB backend does not suffer from this
> >>>> problem.
> >>>>> I
> >>>>>> think in the long run we should migrate the HeapStateBackend to
> always
> >>>>> keep
> >>>>>> data in serialised form, then we also won't have this problem
> anymore.
> >>>>>>
> >>>>>> So I'm very much in favour of keeping data serialised. Copying data
> >>>> would
> >>>>>> only ever be a stopgap solution.
> >>>>>>
> >>>>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com>
> wrote:
> >>>>>>
> >>>>>>> Another approach that would solve the problem for our use case
> >>>> (object
> >>>>>>> re-usage for incremental window ReduceFunctions) would be to copy
> the
> >>>>>> first
> >>>>>>> object that is put into the state.
> >>>>>>> This would be a change on the ReduceState, not on the overall state
> >>>>>>> backend, which should be feasible, no?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> >>>>>>>
> >>>>>>>> -1 for copying objects.
> >>>>>>>>
> >>>>>>>> Storing a serialized data where possible is good, but copying all
> >>>>>> objects
> >>>>>>>> by default is not a good idea, in my opinion.
> >>>>>>>> A lot of scenarios use data types that are hellishly expensive to
> >>>>> copy.
> >>>>>>>> Even the current copy on chain handover is a problem.
> >>>>>>>>
> >>>>>>>> Let's not introduce even more copies.
> >>>>>>>>
> >>>>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> it will come with performance overhead when updating the state,
> >>>>> but I
> >>>>>>>>> think it'll be possible to perform asynchronous snapshots using
> >>>>>>>>> HeapStateBackend (probably some changes to underlying data
> >>>>> structures
> >>>>>>>> would
> >>>>>>>>> be needed) - which would bring more predictable performance.
> >>>>>>>>>
> >>>>>>>>> thanks,
> >>>>>>>>> maciek
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>> I would be in favour of this since it brings things in line with
> >>>>> the
> >>>>>>>>>> RocksDB backend. This will, however, come with quite the
> >>>>> performance
> >>>>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>
> >>>>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi everybody,
> >>>>>>>>>>>
> >>>>>>>>>>> when implementing a ReduceFunction for incremental aggregation
> >>>> of
> >>>>>>> SQL /
> >>>>>>>>>>> Table API window aggregates we noticed that the
> >>>> HeapStateBackend
> >>>>>> does
> >>>>>>>> not
> >>>>>>>>>>> store copies but holds references to the original objects. In
> >>>>> case
> >>>>>>> of a
> >>>>>>>>>>> SlidingWindow, the same object is referenced from different
> >>>>> window
> >>>>>>>> panes.
> >>>>>>>>>>> Therefore, it is not possible to modify these objects (in order
> >>>>> to
> >>>>>>>> avoid
> >>>>>>>>>>> object instantiations, see discussion [1]).
> >>>>>>>>>>>
> >>>>>>>>>>> Other state backends serialize their data such that the
> >>>> behavior
> >>>>> is
> >>>>>>> not
> >>>>>>>>>>> consistent across backends.
> >>>>>>>>>>> If we want to have light-weight tests, we have to create new
> >>>>>> objects
> >>>>>>> in
> >>>>>>>>>>> the
> >>>>>>>>>>> ReduceFunction causing unnecessary overhead.
> >>>>>>>>>>>
> >>>>>>>>>>> I would propose to copy objects when storing them in a
> >>>>>>>> HeapStateBackend.
> >>>>>>>>>>> This would ensure that objects returned from state to the user
> >>>>>> behave
> >>>>>>>>>>> identical for different state backends.
> >>>>>>>>>>>
> >>>>>>>>>>> We created a related JIRA [2] that asks to copy records that go
> >>>>>> into
> >>>>>>> an
> >>>>>>>>>>> incremental ReduceFunction. The scope is more narrow and would
> >>>>>> solve
> >>>>>>>> our
> >>>>>>>>>>> problem, but would leave the inconsistent behavior of state
> >>>>>> backends
> >>>>>>> in
> >>>>>>>>>>> place.
> >>>>>>>>>>>
> >>>>>>>>>>> What do others think?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers, Fabian
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
> >>
>
>
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by sjk <sh...@163.com>.
hi,Fabian Hueske, Sorry for mistake for the whole PR #2792

> On Nov 23, 2016, at 17:10, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi,
> 
> Why do you think that this means "much code changes"?
> I think it would actually be a pretty lightweight change in
> HeapReducingState.
> 
> The proposal is to copy the *first* value that goes into a ReducingState.
> The copy would be done by a TypeSerializer and hence be a deep copy.
> This will allow to reuse the copy in each invocation of the ReduceFunction
> instead of creating a new result object of the same type that was initially
> copied.
> 
> I think the savings of reusing the object in each invocation of the
> ReduceFunction and not creating a new object should amortize the one-time
> object copy.
> 
> Fabian
> 
> 2016-11-23 3:04 GMT+01:00 sjk <sh...@163.com>:
> 
>> Hi, Fabian
>> 
>> So much code changes. Can you show us the key changes code for the object
>> copy?
>> Object reference maybe hold more deep reference, it can be a bomb.
>> Can we renew a object with its data or direct use kryo for object
>> serialization?
>> I’m not prefer object copy.
>> 
>> 
>>> On Nov 22, 2016, at 20:33, Fabian Hueske <fh...@gmail.com> wrote:
>>> 
>>> Does anybody have objections against copying the first record that goes
>>> into the ReduceState?
>>> 
>>> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>> 
>>>> That's right, yes.
>>>> 
>>>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com> wrote:
>>>> 
>>>>> Right, but that would be a much bigger change than "just" copying the
>>>>> *first* record that goes into the ReduceState, or am I missing
>> something?
>>>>> 
>>>>> 
>>>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>>>> 
>>>>>> To bring over my comment from the Github PR that started this
>>>> discussion:
>>>>>> 
>>>>>> @wuchong <https://github.com/wuchong>, yes this is a problem with the
>>>>>> HeapStateBackend. The RocksDB backend does not suffer from this
>>>> problem.
>>>>> I
>>>>>> think in the long run we should migrate the HeapStateBackend to always
>>>>> keep
>>>>>> data in serialised form, then we also won't have this problem anymore.
>>>>>> 
>>>>>> So I'm very much in favour of keeping data serialised. Copying data
>>>> would
>>>>>> only ever be a stopgap solution.
>>>>>> 
>>>>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com> wrote:
>>>>>> 
>>>>>>> Another approach that would solve the problem for our use case
>>>> (object
>>>>>>> re-usage for incremental window ReduceFunctions) would be to copy the
>>>>>> first
>>>>>>> object that is put into the state.
>>>>>>> This would be a change on the ReduceState, not on the overall state
>>>>>>> backend, which should be feasible, no?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
>>>>>>> 
>>>>>>>> -1 for copying objects.
>>>>>>>> 
>>>>>>>> Storing a serialized data where possible is good, but copying all
>>>>>> objects
>>>>>>>> by default is not a good idea, in my opinion.
>>>>>>>> A lot of scenarios use data types that are hellishly expensive to
>>>>> copy.
>>>>>>>> Even the current copy on chain handover is a problem.
>>>>>>>> 
>>>>>>>> Let's not introduce even more copies.
>>>>>>>> 
>>>>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> it will come with performance overhead when updating the state,
>>>>> but I
>>>>>>>>> think it'll be possible to perform asynchronous snapshots using
>>>>>>>>> HeapStateBackend (probably some changes to underlying data
>>>>> structures
>>>>>>>> would
>>>>>>>>> be needed) - which would bring more predictable performance.
>>>>>>>>> 
>>>>>>>>> thanks,
>>>>>>>>> maciek
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> I would be in favour of this since it brings things in line with
>>>>> the
>>>>>>>>>> RocksDB backend. This will, however, come with quite the
>>>>> performance
>>>>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>> 
>>>>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi everybody,
>>>>>>>>>>> 
>>>>>>>>>>> when implementing a ReduceFunction for incremental aggregation
>>>> of
>>>>>>> SQL /
>>>>>>>>>>> Table API window aggregates we noticed that the
>>>> HeapStateBackend
>>>>>> does
>>>>>>>> not
>>>>>>>>>>> store copies but holds references to the original objects. In
>>>>> case
>>>>>>> of a
>>>>>>>>>>> SlidingWindow, the same object is referenced from different
>>>>> window
>>>>>>>> panes.
>>>>>>>>>>> Therefore, it is not possible to modify these objects (in order
>>>>> to
>>>>>>>> avoid
>>>>>>>>>>> object instantiations, see discussion [1]).
>>>>>>>>>>> 
>>>>>>>>>>> Other state backends serialize their data such that the
>>>> behavior
>>>>> is
>>>>>>> not
>>>>>>>>>>> consistent across backends.
>>>>>>>>>>> If we want to have light-weight tests, we have to create new
>>>>>> objects
>>>>>>> in
>>>>>>>>>>> the
>>>>>>>>>>> ReduceFunction causing unnecessary overhead.
>>>>>>>>>>> 
>>>>>>>>>>> I would propose to copy objects when storing them in a
>>>>>>>> HeapStateBackend.
>>>>>>>>>>> This would ensure that objects returned from state to the user
>>>>>> behave
>>>>>>>>>>> identical for different state backends.
>>>>>>>>>>> 
>>>>>>>>>>> We created a related JIRA [2] that asks to copy records that go
>>>>>> into
>>>>>>> an
>>>>>>>>>>> incremental ReduceFunction. The scope is more narrow and would
>>>>>> solve
>>>>>>>> our
>>>>>>>>>>> problem, but would leave the inconsistent behavior of state
>>>>>> backends
>>>>>>> in
>>>>>>>>>>> place.
>>>>>>>>>>> 
>>>>>>>>>>> What do others think?
>>>>>>>>>>> 
>>>>>>>>>>> Cheers, Fabian
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>> https://github.com/apache/flink/pull/2792#discussion_r88653721
>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 
>> 



Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Aljoscha Krettek <al...@apache.org>.
I think it's not a good idea to introduce this special case fix for one
specific use case because this can have implications for other parts of the
code. We should push for keeping the data in serialised form. There's also
other problems, for example, a ListState allows modifying the returned
Iterable and this will change the actual contents of the state in the
HeapStateBackend while in the RocksDB backend this does not change the
state.

On Wed, 23 Nov 2016 at 10:11 Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Why do you think that this means "much code changes"?
> I think it would actually be a pretty lightweight change in
> HeapReducingState.
>
> The proposal is to copy the *first* value that goes into a ReducingState.
> The copy would be done by a TypeSerializer and hence be a deep copy.
> This will allow to reuse the copy in each invocation of the ReduceFunction
> instead of creating a new result object of the same type that was initially
> copied.
>
> I think the savings of reusing the object in each invocation of the
> ReduceFunction and not creating a new object should amortize the one-time
> object copy.
>
> Fabian
>
> 2016-11-23 3:04 GMT+01:00 sjk <sh...@163.com>:
>
> > Hi, Fabian
> >
> > So much code changes. Can you show us the key changes code for the object
> > copy?
> > Object reference maybe hold more deep reference, it can be a bomb.
> > Can we renew a object with its data or direct use kryo for object
> > serialization?
> > I’m not prefer object copy.
> >
> >
> > > On Nov 22, 2016, at 20:33, Fabian Hueske <fh...@gmail.com> wrote:
> > >
> > > Does anybody have objections against copying the first record that goes
> > > into the ReduceState?
> > >
> > > 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> > >
> > >> That's right, yes.
> > >>
> > >> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com> wrote:
> > >>
> > >>> Right, but that would be a much bigger change than "just" copying the
> > >>> *first* record that goes into the ReduceState, or am I missing
> > something?
> > >>>
> > >>>
> > >>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> > >>>
> > >>>> To bring over my comment from the Github PR that started this
> > >> discussion:
> > >>>>
> > >>>> @wuchong <https://github.com/wuchong>, yes this is a problem with
> the
> > >>>> HeapStateBackend. The RocksDB backend does not suffer from this
> > >> problem.
> > >>> I
> > >>>> think in the long run we should migrate the HeapStateBackend to
> always
> > >>> keep
> > >>>> data in serialised form, then we also won't have this problem
> anymore.
> > >>>>
> > >>>> So I'm very much in favour of keeping data serialised. Copying data
> > >> would
> > >>>> only ever be a stopgap solution.
> > >>>>
> > >>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com>
> wrote:
> > >>>>
> > >>>>> Another approach that would solve the problem for our use case
> > >> (object
> > >>>>> re-usage for incremental window ReduceFunctions) would be to copy
> the
> > >>>> first
> > >>>>> object that is put into the state.
> > >>>>> This would be a change on the ReduceState, not on the overall state
> > >>>>> backend, which should be feasible, no?
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > >>>>>
> > >>>>>> -1 for copying objects.
> > >>>>>>
> > >>>>>> Storing a serialized data where possible is good, but copying all
> > >>>> objects
> > >>>>>> by default is not a good idea, in my opinion.
> > >>>>>> A lot of scenarios use data types that are hellishly expensive to
> > >>> copy.
> > >>>>>> Even the current copy on chain handover is a problem.
> > >>>>>>
> > >>>>>> Let's not introduce even more copies.
> > >>>>>>
> > >>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
> > >>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> it will come with performance overhead when updating the state,
> > >>> but I
> > >>>>>>> think it'll be possible to perform asynchronous snapshots using
> > >>>>>>> HeapStateBackend (probably some changes to underlying data
> > >>> structures
> > >>>>>> would
> > >>>>>>> be needed) - which would bring more predictable performance.
> > >>>>>>>
> > >>>>>>> thanks,
> > >>>>>>> maciek
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > >>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>> I would be in favour of this since it brings things in line with
> > >>> the
> > >>>>>>>> RocksDB backend. This will, however, come with quite the
> > >>> performance
> > >>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Aljoscha
> > >>>>>>>>
> > >>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
> > >>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Hi everybody,
> > >>>>>>>>>
> > >>>>>>>>> when implementing a ReduceFunction for incremental aggregation
> > >> of
> > >>>>> SQL /
> > >>>>>>>>> Table API window aggregates we noticed that the
> > >> HeapStateBackend
> > >>>> does
> > >>>>>> not
> > >>>>>>>>> store copies but holds references to the original objects. In
> > >>> case
> > >>>>> of a
> > >>>>>>>>> SlidingWindow, the same object is referenced from different
> > >>> window
> > >>>>>> panes.
> > >>>>>>>>> Therefore, it is not possible to modify these objects (in order
> > >>> to
> > >>>>>> avoid
> > >>>>>>>>> object instantiations, see discussion [1]).
> > >>>>>>>>>
> > >>>>>>>>> Other state backends serialize their data such that the
> > >> behavior
> > >>> is
> > >>>>> not
> > >>>>>>>>> consistent across backends.
> > >>>>>>>>> If we want to have light-weight tests, we have to create new
> > >>>> objects
> > >>>>> in
> > >>>>>>>>> the
> > >>>>>>>>> ReduceFunction causing unnecessary overhead.
> > >>>>>>>>>
> > >>>>>>>>> I would propose to copy objects when storing them in a
> > >>>>>> HeapStateBackend.
> > >>>>>>>>> This would ensure that objects returned from state to the user
> > >>>> behave
> > >>>>>>>>> identical for different state backends.
> > >>>>>>>>>
> > >>>>>>>>> We created a related JIRA [2] that asks to copy records that go
> > >>>> into
> > >>>>> an
> > >>>>>>>>> incremental ReduceFunction. The scope is more narrow and would
> > >>>> solve
> > >>>>>> our
> > >>>>>>>>> problem, but would leave the inconsistent behavior of state
> > >>>> backends
> > >>>>> in
> > >>>>>>>>> place.
> > >>>>>>>>>
> > >>>>>>>>> What do others think?
> > >>>>>>>>>
> > >>>>>>>>> Cheers, Fabian
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> > >>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Why do you think that this means "much code changes"?
I think it would actually be a pretty lightweight change in
HeapReducingState.

The proposal is to copy the *first* value that goes into a ReducingState.
The copy would be done by a TypeSerializer and hence be a deep copy.
This will allow to reuse the copy in each invocation of the ReduceFunction
instead of creating a new result object of the same type that was initially
copied.

I think the savings of reusing the object in each invocation of the
ReduceFunction and not creating a new object should amortize the one-time
object copy.

Fabian

2016-11-23 3:04 GMT+01:00 sjk <sh...@163.com>:

> Hi, Fabian
>
> So much code changes. Can you show us the key changes code for the object
> copy?
> Object reference maybe hold more deep reference, it can be a bomb.
> Can we renew a object with its data or direct use kryo for object
> serialization?
> I’m not prefer object copy.
>
>
> > On Nov 22, 2016, at 20:33, Fabian Hueske <fh...@gmail.com> wrote:
> >
> > Does anybody have objections against copying the first record that goes
> > into the ReduceState?
> >
> > 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >
> >> That's right, yes.
> >>
> >> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com> wrote:
> >>
> >>> Right, but that would be a much bigger change than "just" copying the
> >>> *first* record that goes into the ReduceState, or am I missing
> something?
> >>>
> >>>
> >>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >>>
> >>>> To bring over my comment from the Github PR that started this
> >> discussion:
> >>>>
> >>>> @wuchong <https://github.com/wuchong>, yes this is a problem with the
> >>>> HeapStateBackend. The RocksDB backend does not suffer from this
> >> problem.
> >>> I
> >>>> think in the long run we should migrate the HeapStateBackend to always
> >>> keep
> >>>> data in serialised form, then we also won't have this problem anymore.
> >>>>
> >>>> So I'm very much in favour of keeping data serialised. Copying data
> >> would
> >>>> only ever be a stopgap solution.
> >>>>
> >>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com> wrote:
> >>>>
> >>>>> Another approach that would solve the problem for our use case
> >> (object
> >>>>> re-usage for incremental window ReduceFunctions) would be to copy the
> >>>> first
> >>>>> object that is put into the state.
> >>>>> This would be a change on the ReduceState, not on the overall state
> >>>>> backend, which should be feasible, no?
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> >>>>>
> >>>>>> -1 for copying objects.
> >>>>>>
> >>>>>> Storing a serialized data where possible is good, but copying all
> >>>> objects
> >>>>>> by default is not a good idea, in my opinion.
> >>>>>> A lot of scenarios use data types that are hellishly expensive to
> >>> copy.
> >>>>>> Even the current copy on chain handover is a problem.
> >>>>>>
> >>>>>> Let's not introduce even more copies.
> >>>>>>
> >>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> it will come with performance overhead when updating the state,
> >>> but I
> >>>>>>> think it'll be possible to perform asynchronous snapshots using
> >>>>>>> HeapStateBackend (probably some changes to underlying data
> >>> structures
> >>>>>> would
> >>>>>>> be needed) - which would bring more predictable performance.
> >>>>>>>
> >>>>>>> thanks,
> >>>>>>> maciek
> >>>>>>>
> >>>>>>>
> >>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>> I would be in favour of this since it brings things in line with
> >>> the
> >>>>>>>> RocksDB backend. This will, however, come with quite the
> >>> performance
> >>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi everybody,
> >>>>>>>>>
> >>>>>>>>> when implementing a ReduceFunction for incremental aggregation
> >> of
> >>>>> SQL /
> >>>>>>>>> Table API window aggregates we noticed that the
> >> HeapStateBackend
> >>>> does
> >>>>>> not
> >>>>>>>>> store copies but holds references to the original objects. In
> >>> case
> >>>>> of a
> >>>>>>>>> SlidingWindow, the same object is referenced from different
> >>> window
> >>>>>> panes.
> >>>>>>>>> Therefore, it is not possible to modify these objects (in order
> >>> to
> >>>>>> avoid
> >>>>>>>>> object instantiations, see discussion [1]).
> >>>>>>>>>
> >>>>>>>>> Other state backends serialize their data such that the
> >> behavior
> >>> is
> >>>>> not
> >>>>>>>>> consistent across backends.
> >>>>>>>>> If we want to have light-weight tests, we have to create new
> >>>> objects
> >>>>> in
> >>>>>>>>> the
> >>>>>>>>> ReduceFunction causing unnecessary overhead.
> >>>>>>>>>
> >>>>>>>>> I would propose to copy objects when storing them in a
> >>>>>> HeapStateBackend.
> >>>>>>>>> This would ensure that objects returned from state to the user
> >>>> behave
> >>>>>>>>> identical for different state backends.
> >>>>>>>>>
> >>>>>>>>> We created a related JIRA [2] that asks to copy records that go
> >>>> into
> >>>>> an
> >>>>>>>>> incremental ReduceFunction. The scope is more narrow and would
> >>>> solve
> >>>>>> our
> >>>>>>>>> problem, but would leave the inconsistent behavior of state
> >>>> backends
> >>>>> in
> >>>>>>>>> place.
> >>>>>>>>>
> >>>>>>>>> What do others think?
> >>>>>>>>>
> >>>>>>>>> Cheers, Fabian
> >>>>>>>>>
> >>>>>>>>> [1]
> >>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> >>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by sjk <sh...@163.com>.
Hi, Fabian

So much code changes. Can you show us the key changes code for the object copy?
Object reference maybe hold more deep reference, it can be a bomb. 
Can we renew a object with its data or direct use kryo for object serialization?
I’m not prefer object copy.


> On Nov 22, 2016, at 20:33, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Does anybody have objections against copying the first record that goes
> into the ReduceState?
> 
> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> 
>> That's right, yes.
>> 
>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com> wrote:
>> 
>>> Right, but that would be a much bigger change than "just" copying the
>>> *first* record that goes into the ReduceState, or am I missing something?
>>> 
>>> 
>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>> 
>>>> To bring over my comment from the Github PR that started this
>> discussion:
>>>> 
>>>> @wuchong <https://github.com/wuchong>, yes this is a problem with the
>>>> HeapStateBackend. The RocksDB backend does not suffer from this
>> problem.
>>> I
>>>> think in the long run we should migrate the HeapStateBackend to always
>>> keep
>>>> data in serialised form, then we also won't have this problem anymore.
>>>> 
>>>> So I'm very much in favour of keeping data serialised. Copying data
>> would
>>>> only ever be a stopgap solution.
>>>> 
>>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com> wrote:
>>>> 
>>>>> Another approach that would solve the problem for our use case
>> (object
>>>>> re-usage for incremental window ReduceFunctions) would be to copy the
>>>> first
>>>>> object that is put into the state.
>>>>> This would be a change on the ReduceState, not on the overall state
>>>>> backend, which should be feasible, no?
>>>>> 
>>>>> 
>>>>> 
>>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
>>>>> 
>>>>>> -1 for copying objects.
>>>>>> 
>>>>>> Storing a serialized data where possible is good, but copying all
>>>> objects
>>>>>> by default is not a good idea, in my opinion.
>>>>>> A lot of scenarios use data types that are hellishly expensive to
>>> copy.
>>>>>> Even the current copy on chain handover is a problem.
>>>>>> 
>>>>>> Let's not introduce even more copies.
>>>>>> 
>>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> it will come with performance overhead when updating the state,
>>> but I
>>>>>>> think it'll be possible to perform asynchronous snapshots using
>>>>>>> HeapStateBackend (probably some changes to underlying data
>>> structures
>>>>>> would
>>>>>>> be needed) - which would bring more predictable performance.
>>>>>>> 
>>>>>>> thanks,
>>>>>>> maciek
>>>>>>> 
>>>>>>> 
>>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> I would be in favour of this since it brings things in line with
>>> the
>>>>>>>> RocksDB backend. This will, however, come with quite the
>>> performance
>>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>> 
>>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi everybody,
>>>>>>>>> 
>>>>>>>>> when implementing a ReduceFunction for incremental aggregation
>> of
>>>>> SQL /
>>>>>>>>> Table API window aggregates we noticed that the
>> HeapStateBackend
>>>> does
>>>>>> not
>>>>>>>>> store copies but holds references to the original objects. In
>>> case
>>>>> of a
>>>>>>>>> SlidingWindow, the same object is referenced from different
>>> window
>>>>>> panes.
>>>>>>>>> Therefore, it is not possible to modify these objects (in order
>>> to
>>>>>> avoid
>>>>>>>>> object instantiations, see discussion [1]).
>>>>>>>>> 
>>>>>>>>> Other state backends serialize their data such that the
>> behavior
>>> is
>>>>> not
>>>>>>>>> consistent across backends.
>>>>>>>>> If we want to have light-weight tests, we have to create new
>>>> objects
>>>>> in
>>>>>>>>> the
>>>>>>>>> ReduceFunction causing unnecessary overhead.
>>>>>>>>> 
>>>>>>>>> I would propose to copy objects when storing them in a
>>>>>> HeapStateBackend.
>>>>>>>>> This would ensure that objects returned from state to the user
>>>> behave
>>>>>>>>> identical for different state backends.
>>>>>>>>> 
>>>>>>>>> We created a related JIRA [2] that asks to copy records that go
>>>> into
>>>>> an
>>>>>>>>> incremental ReduceFunction. The scope is more narrow and would
>>>> solve
>>>>>> our
>>>>>>>>> problem, but would leave the inconsistent behavior of state
>>>> backends
>>>>> in
>>>>>>>>> place.
>>>>>>>>> 
>>>>>>>>> What do others think?
>>>>>>>>> 
>>>>>>>>> Cheers, Fabian
>>>>>>>>> 
>>>>>>>>> [1]
>>> https://github.com/apache/flink/pull/2792#discussion_r88653721
>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 



Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Fabian Hueske <fh...@gmail.com>.
Does anybody have objections against copying the first record that goes
into the ReduceState?

2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> That's right, yes.
>
> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com> wrote:
>
> > Right, but that would be a much bigger change than "just" copying the
> > *first* record that goes into the ReduceState, or am I missing something?
> >
> >
> > 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >
> > > To bring over my comment from the Github PR that started this
> discussion:
> > >
> > > @wuchong <https://github.com/wuchong>, yes this is a problem with the
> > > HeapStateBackend. The RocksDB backend does not suffer from this
> problem.
> > I
> > > think in the long run we should migrate the HeapStateBackend to always
> > keep
> > > data in serialised form, then we also won't have this problem anymore.
> > >
> > > So I'm very much in favour of keeping data serialised. Copying data
> would
> > > only ever be a stopgap solution.
> > >
> > > On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com> wrote:
> > >
> > > > Another approach that would solve the problem for our use case
> (object
> > > > re-usage for incremental window ReduceFunctions) would be to copy the
> > > first
> > > > object that is put into the state.
> > > > This would be a change on the ReduceState, not on the overall state
> > > > backend, which should be feasible, no?
> > > >
> > > >
> > > >
> > > > 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > > >
> > > > > -1 for copying objects.
> > > > >
> > > > > Storing a serialized data where possible is good, but copying all
> > > objects
> > > > > by default is not a good idea, in my opinion.
> > > > > A lot of scenarios use data types that are hellishly expensive to
> > copy.
> > > > > Even the current copy on chain handover is a problem.
> > > > >
> > > > > Let's not introduce even more copies.
> > > > >
> > > > > On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > it will come with performance overhead when updating the state,
> > but I
> > > > > > think it'll be possible to perform asynchronous snapshots using
> > > > > > HeapStateBackend (probably some changes to underlying data
> > structures
> > > > > would
> > > > > > be needed) - which would bring more predictable performance.
> > > > > >
> > > > > > thanks,
> > > > > > maciek
> > > > > >
> > > > > >
> > > > > > On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >> I would be in favour of this since it brings things in line with
> > the
> > > > > >> RocksDB backend. This will, however, come with quite the
> > performance
> > > > > >> overhead, depending on how fast the TypeSerializer can copy.
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >> Hi everybody,
> > > > > >>>
> > > > > >>> when implementing a ReduceFunction for incremental aggregation
> of
> > > > SQL /
> > > > > >>> Table API window aggregates we noticed that the
> HeapStateBackend
> > > does
> > > > > not
> > > > > >>> store copies but holds references to the original objects. In
> > case
> > > > of a
> > > > > >>> SlidingWindow, the same object is referenced from different
> > window
> > > > > panes.
> > > > > >>> Therefore, it is not possible to modify these objects (in order
> > to
> > > > > avoid
> > > > > >>> object instantiations, see discussion [1]).
> > > > > >>>
> > > > > >>> Other state backends serialize their data such that the
> behavior
> > is
> > > > not
> > > > > >>> consistent across backends.
> > > > > >>> If we want to have light-weight tests, we have to create new
> > > objects
> > > > in
> > > > > >>> the
> > > > > >>> ReduceFunction causing unnecessary overhead.
> > > > > >>>
> > > > > >>> I would propose to copy objects when storing them in a
> > > > > HeapStateBackend.
> > > > > >>> This would ensure that objects returned from state to the user
> > > behave
> > > > > >>> identical for different state backends.
> > > > > >>>
> > > > > >>> We created a related JIRA [2] that asks to copy records that go
> > > into
> > > > an
> > > > > >>> incremental ReduceFunction. The scope is more narrow and would
> > > solve
> > > > > our
> > > > > >>> problem, but would leave the inconsistent behavior of state
> > > backends
> > > > in
> > > > > >>> place.
> > > > > >>>
> > > > > >>> What do others think?
> > > > > >>>
> > > > > >>> Cheers, Fabian
> > > > > >>>
> > > > > >>> [1]
> > https://github.com/apache/flink/pull/2792#discussion_r88653721
> > > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > > > > >>>
> > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Aljoscha Krettek <al...@apache.org>.
That's right, yes.

On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fh...@gmail.com> wrote:

> Right, but that would be a much bigger change than "just" copying the
> *first* record that goes into the ReduceState, or am I missing something?
>
>
> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> > To bring over my comment from the Github PR that started this discussion:
> >
> > @wuchong <https://github.com/wuchong>, yes this is a problem with the
> > HeapStateBackend. The RocksDB backend does not suffer from this problem.
> I
> > think in the long run we should migrate the HeapStateBackend to always
> keep
> > data in serialised form, then we also won't have this problem anymore.
> >
> > So I'm very much in favour of keeping data serialised. Copying data would
> > only ever be a stopgap solution.
> >
> > On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com> wrote:
> >
> > > Another approach that would solve the problem for our use case (object
> > > re-usage for incremental window ReduceFunctions) would be to copy the
> > first
> > > object that is put into the state.
> > > This would be a change on the ReduceState, not on the overall state
> > > backend, which should be feasible, no?
> > >
> > >
> > >
> > > 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > >
> > > > -1 for copying objects.
> > > >
> > > > Storing a serialized data where possible is good, but copying all
> > objects
> > > > by default is not a good idea, in my opinion.
> > > > A lot of scenarios use data types that are hellishly expensive to
> copy.
> > > > Even the current copy on chain handover is a problem.
> > > >
> > > > Let's not introduce even more copies.
> > > >
> > > > On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl>
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > it will come with performance overhead when updating the state,
> but I
> > > > > think it'll be possible to perform asynchronous snapshots using
> > > > > HeapStateBackend (probably some changes to underlying data
> structures
> > > > would
> > > > > be needed) - which would bring more predictable performance.
> > > > >
> > > > > thanks,
> > > > > maciek
> > > > >
> > > > >
> > > > > On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > > > >
> > > > >> Hi,
> > > > >> I would be in favour of this since it brings things in line with
> the
> > > > >> RocksDB backend. This will, however, come with quite the
> performance
> > > > >> overhead, depending on how fast the TypeSerializer can copy.
> > > > >>
> > > > >> Cheers,
> > > > >> Aljoscha
> > > > >>
> > > > >> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
> > wrote:
> > > > >>
> > > > >> Hi everybody,
> > > > >>>
> > > > >>> when implementing a ReduceFunction for incremental aggregation of
> > > SQL /
> > > > >>> Table API window aggregates we noticed that the HeapStateBackend
> > does
> > > > not
> > > > >>> store copies but holds references to the original objects. In
> case
> > > of a
> > > > >>> SlidingWindow, the same object is referenced from different
> window
> > > > panes.
> > > > >>> Therefore, it is not possible to modify these objects (in order
> to
> > > > avoid
> > > > >>> object instantiations, see discussion [1]).
> > > > >>>
> > > > >>> Other state backends serialize their data such that the behavior
> is
> > > not
> > > > >>> consistent across backends.
> > > > >>> If we want to have light-weight tests, we have to create new
> > objects
> > > in
> > > > >>> the
> > > > >>> ReduceFunction causing unnecessary overhead.
> > > > >>>
> > > > >>> I would propose to copy objects when storing them in a
> > > > HeapStateBackend.
> > > > >>> This would ensure that objects returned from state to the user
> > behave
> > > > >>> identical for different state backends.
> > > > >>>
> > > > >>> We created a related JIRA [2] that asks to copy records that go
> > into
> > > an
> > > > >>> incremental ReduceFunction. The scope is more narrow and would
> > solve
> > > > our
> > > > >>> problem, but would leave the inconsistent behavior of state
> > backends
> > > in
> > > > >>> place.
> > > > >>>
> > > > >>> What do others think?
> > > > >>>
> > > > >>> Cheers, Fabian
> > > > >>>
> > > > >>> [1]
> https://github.com/apache/flink/pull/2792#discussion_r88653721
> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > > > >>>
> > > > >>>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Fabian Hueske <fh...@gmail.com>.
Right, but that would be a much bigger change than "just" copying the
*first* record that goes into the ReduceState, or am I missing something?


2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> To bring over my comment from the Github PR that started this discussion:
>
> @wuchong <https://github.com/wuchong>, yes this is a problem with the
> HeapStateBackend. The RocksDB backend does not suffer from this problem. I
> think in the long run we should migrate the HeapStateBackend to always keep
> data in serialised form, then we also won't have this problem anymore.
>
> So I'm very much in favour of keeping data serialised. Copying data would
> only ever be a stopgap solution.
>
> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com> wrote:
>
> > Another approach that would solve the problem for our use case (object
> > re-usage for incremental window ReduceFunctions) would be to copy the
> first
> > object that is put into the state.
> > This would be a change on the ReduceState, not on the overall state
> > backend, which should be feasible, no?
> >
> >
> >
> > 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> >
> > > -1 for copying objects.
> > >
> > > Storing a serialized data where possible is good, but copying all
> objects
> > > by default is not a good idea, in my opinion.
> > > A lot of scenarios use data types that are hellishly expensive to copy.
> > > Even the current copy on chain handover is a problem.
> > >
> > > Let's not introduce even more copies.
> > >
> > > On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl> wrote:
> > >
> > > > Hi,
> > > >
> > > > it will come with performance overhead when updating the state, but I
> > > > think it'll be possible to perform asynchronous snapshots using
> > > > HeapStateBackend (probably some changes to underlying data structures
> > > would
> > > > be needed) - which would bring more predictable performance.
> > > >
> > > > thanks,
> > > > maciek
> > > >
> > > >
> > > > On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > > >
> > > >> Hi,
> > > >> I would be in favour of this since it brings things in line with the
> > > >> RocksDB backend. This will, however, come with quite the performance
> > > >> overhead, depending on how fast the TypeSerializer can copy.
> > > >>
> > > >> Cheers,
> > > >> Aljoscha
> > > >>
> > > >> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com>
> wrote:
> > > >>
> > > >> Hi everybody,
> > > >>>
> > > >>> when implementing a ReduceFunction for incremental aggregation of
> > SQL /
> > > >>> Table API window aggregates we noticed that the HeapStateBackend
> does
> > > not
> > > >>> store copies but holds references to the original objects. In case
> > of a
> > > >>> SlidingWindow, the same object is referenced from different window
> > > panes.
> > > >>> Therefore, it is not possible to modify these objects (in order to
> > > avoid
> > > >>> object instantiations, see discussion [1]).
> > > >>>
> > > >>> Other state backends serialize their data such that the behavior is
> > not
> > > >>> consistent across backends.
> > > >>> If we want to have light-weight tests, we have to create new
> objects
> > in
> > > >>> the
> > > >>> ReduceFunction causing unnecessary overhead.
> > > >>>
> > > >>> I would propose to copy objects when storing them in a
> > > HeapStateBackend.
> > > >>> This would ensure that objects returned from state to the user
> behave
> > > >>> identical for different state backends.
> > > >>>
> > > >>> We created a related JIRA [2] that asks to copy records that go
> into
> > an
> > > >>> incremental ReduceFunction. The scope is more narrow and would
> solve
> > > our
> > > >>> problem, but would leave the inconsistent behavior of state
> backends
> > in
> > > >>> place.
> > > >>>
> > > >>> What do others think?
> > > >>>
> > > >>> Cheers, Fabian
> > > >>>
> > > >>> [1] https://github.com/apache/flink/pull/2792#discussion_r88653721
> > > >>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > > >>>
> > > >>>
> > > >
> > >
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Aljoscha Krettek <al...@apache.org>.
To bring over my comment from the Github PR that started this discussion:

@wuchong <https://github.com/wuchong>, yes this is a problem with the
HeapStateBackend. The RocksDB backend does not suffer from this problem. I
think in the long run we should migrate the HeapStateBackend to always keep
data in serialised form, then we also won't have this problem anymore.

So I'm very much in favour of keeping data serialised. Copying data would
only ever be a stopgap solution.

On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fh...@gmail.com> wrote:

> Another approach that would solve the problem for our use case (object
> re-usage for incremental window ReduceFunctions) would be to copy the first
> object that is put into the state.
> This would be a change on the ReduceState, not on the overall state
> backend, which should be feasible, no?
>
>
>
> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
> > -1 for copying objects.
> >
> > Storing a serialized data where possible is good, but copying all objects
> > by default is not a good idea, in my opinion.
> > A lot of scenarios use data types that are hellishly expensive to copy.
> > Even the current copy on chain handover is a problem.
> >
> > Let's not introduce even more copies.
> >
> > On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl> wrote:
> >
> > > Hi,
> > >
> > > it will come with performance overhead when updating the state, but I
> > > think it'll be possible to perform asynchronous snapshots using
> > > HeapStateBackend (probably some changes to underlying data structures
> > would
> > > be needed) - which would bring more predictable performance.
> > >
> > > thanks,
> > > maciek
> > >
> > >
> > > On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > >
> > >> Hi,
> > >> I would be in favour of this since it brings things in line with the
> > >> RocksDB backend. This will, however, come with quite the performance
> > >> overhead, depending on how fast the TypeSerializer can copy.
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com> wrote:
> > >>
> > >> Hi everybody,
> > >>>
> > >>> when implementing a ReduceFunction for incremental aggregation of
> SQL /
> > >>> Table API window aggregates we noticed that the HeapStateBackend does
> > not
> > >>> store copies but holds references to the original objects. In case
> of a
> > >>> SlidingWindow, the same object is referenced from different window
> > panes.
> > >>> Therefore, it is not possible to modify these objects (in order to
> > avoid
> > >>> object instantiations, see discussion [1]).
> > >>>
> > >>> Other state backends serialize their data such that the behavior is
> not
> > >>> consistent across backends.
> > >>> If we want to have light-weight tests, we have to create new objects
> in
> > >>> the
> > >>> ReduceFunction causing unnecessary overhead.
> > >>>
> > >>> I would propose to copy objects when storing them in a
> > HeapStateBackend.
> > >>> This would ensure that objects returned from state to the user behave
> > >>> identical for different state backends.
> > >>>
> > >>> We created a related JIRA [2] that asks to copy records that go into
> an
> > >>> incremental ReduceFunction. The scope is more narrow and would solve
> > our
> > >>> problem, but would leave the inconsistent behavior of state backends
> in
> > >>> place.
> > >>>
> > >>> What do others think?
> > >>>
> > >>> Cheers, Fabian
> > >>>
> > >>> [1] https://github.com/apache/flink/pull/2792#discussion_r88653721
> > >>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > >>>
> > >>>
> > >
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Fabian Hueske <fh...@gmail.com>.
Another approach that would solve the problem for our use case (object
re-usage for incremental window ReduceFunctions) would be to copy the first
object that is put into the state.
This would be a change on the ReduceState, not on the overall state
backend, which should be feasible, no?



2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:

> -1 for copying objects.
>
> Storing a serialized data where possible is good, but copying all objects
> by default is not a good idea, in my opinion.
> A lot of scenarios use data types that are hellishly expensive to copy.
> Even the current copy on chain handover is a problem.
>
> Let's not introduce even more copies.
>
> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl> wrote:
>
> > Hi,
> >
> > it will come with performance overhead when updating the state, but I
> > think it'll be possible to perform asynchronous snapshots using
> > HeapStateBackend (probably some changes to underlying data structures
> would
> > be needed) - which would bring more predictable performance.
> >
> > thanks,
> > maciek
> >
> >
> > On 21/11/2016 13:48, Aljoscha Krettek wrote:
> >
> >> Hi,
> >> I would be in favour of this since it brings things in line with the
> >> RocksDB backend. This will, however, come with quite the performance
> >> overhead, depending on how fast the TypeSerializer can copy.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com> wrote:
> >>
> >> Hi everybody,
> >>>
> >>> when implementing a ReduceFunction for incremental aggregation of SQL /
> >>> Table API window aggregates we noticed that the HeapStateBackend does
> not
> >>> store copies but holds references to the original objects. In case of a
> >>> SlidingWindow, the same object is referenced from different window
> panes.
> >>> Therefore, it is not possible to modify these objects (in order to
> avoid
> >>> object instantiations, see discussion [1]).
> >>>
> >>> Other state backends serialize their data such that the behavior is not
> >>> consistent across backends.
> >>> If we want to have light-weight tests, we have to create new objects in
> >>> the
> >>> ReduceFunction causing unnecessary overhead.
> >>>
> >>> I would propose to copy objects when storing them in a
> HeapStateBackend.
> >>> This would ensure that objects returned from state to the user behave
> >>> identical for different state backends.
> >>>
> >>> We created a related JIRA [2] that asks to copy records that go into an
> >>> incremental ReduceFunction. The scope is more narrow and would solve
> our
> >>> problem, but would leave the inconsistent behavior of state backends in
> >>> place.
> >>>
> >>> What do others think?
> >>>
> >>> Cheers, Fabian
> >>>
> >>> [1] https://github.com/apache/flink/pull/2792#discussion_r88653721
> >>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> >>>
> >>>
> >
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Stephan Ewen <se...@apache.org>.
-1 for copying objects.

Storing a serialized data where possible is good, but copying all objects
by default is not a good idea, in my opinion.
A lot of scenarios use data types that are hellishly expensive to copy.
Even the current copy on chain handover is a problem.

Let's not introduce even more copies.

On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <mp...@touk.pl> wrote:

> Hi,
>
> it will come with performance overhead when updating the state, but I
> think it'll be possible to perform asynchronous snapshots using
> HeapStateBackend (probably some changes to underlying data structures would
> be needed) - which would bring more predictable performance.
>
> thanks,
> maciek
>
>
> On 21/11/2016 13:48, Aljoscha Krettek wrote:
>
>> Hi,
>> I would be in favour of this since it brings things in line with the
>> RocksDB backend. This will, however, come with quite the performance
>> overhead, depending on how fast the TypeSerializer can copy.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi everybody,
>>>
>>> when implementing a ReduceFunction for incremental aggregation of SQL /
>>> Table API window aggregates we noticed that the HeapStateBackend does not
>>> store copies but holds references to the original objects. In case of a
>>> SlidingWindow, the same object is referenced from different window panes.
>>> Therefore, it is not possible to modify these objects (in order to avoid
>>> object instantiations, see discussion [1]).
>>>
>>> Other state backends serialize their data such that the behavior is not
>>> consistent across backends.
>>> If we want to have light-weight tests, we have to create new objects in
>>> the
>>> ReduceFunction causing unnecessary overhead.
>>>
>>> I would propose to copy objects when storing them in a HeapStateBackend.
>>> This would ensure that objects returned from state to the user behave
>>> identical for different state backends.
>>>
>>> We created a related JIRA [2] that asks to copy records that go into an
>>> incremental ReduceFunction. The scope is more narrow and would solve our
>>> problem, but would leave the inconsistent behavior of state backends in
>>> place.
>>>
>>> What do others think?
>>>
>>> Cheers, Fabian
>>>
>>> [1] https://github.com/apache/flink/pull/2792#discussion_r88653721
>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
>>>
>>>
>

Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Maciek Próchniak <mp...@touk.pl>.
Hi,

it will come with performance overhead when updating the state, but I 
think it'll be possible to perform asynchronous snapshots using 
HeapStateBackend (probably some changes to underlying data structures 
would be needed) - which would bring more predictable performance.

thanks,
maciek

On 21/11/2016 13:48, Aljoscha Krettek wrote:
> Hi,
> I would be in favour of this since it brings things in line with the
> RocksDB backend. This will, however, come with quite the performance
> overhead, depending on how fast the TypeSerializer can copy.
>
> Cheers,
> Aljoscha
>
> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi everybody,
>>
>> when implementing a ReduceFunction for incremental aggregation of SQL /
>> Table API window aggregates we noticed that the HeapStateBackend does not
>> store copies but holds references to the original objects. In case of a
>> SlidingWindow, the same object is referenced from different window panes.
>> Therefore, it is not possible to modify these objects (in order to avoid
>> object instantiations, see discussion [1]).
>>
>> Other state backends serialize their data such that the behavior is not
>> consistent across backends.
>> If we want to have light-weight tests, we have to create new objects in the
>> ReduceFunction causing unnecessary overhead.
>>
>> I would propose to copy objects when storing them in a HeapStateBackend.
>> This would ensure that objects returned from state to the user behave
>> identical for different state backends.
>>
>> We created a related JIRA [2] that asks to copy records that go into an
>> incremental ReduceFunction. The scope is more narrow and would solve our
>> problem, but would leave the inconsistent behavior of state backends in
>> place.
>>
>> What do others think?
>>
>> Cheers, Fabian
>>
>> [1] https://github.com/apache/flink/pull/2792#discussion_r88653721
>> [2] https://issues.apache.org/jira/browse/FLINK-5105
>>


Re: [DISCUSS] Hold copies in HeapStateBackend

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I would be in favour of this since it brings things in line with the
RocksDB backend. This will, however, come with quite the performance
overhead, depending on how fast the TypeSerializer can copy.

Cheers,
Aljoscha

On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fh...@gmail.com> wrote:

> Hi everybody,
>
> when implementing a ReduceFunction for incremental aggregation of SQL /
> Table API window aggregates we noticed that the HeapStateBackend does not
> store copies but holds references to the original objects. In case of a
> SlidingWindow, the same object is referenced from different window panes.
> Therefore, it is not possible to modify these objects (in order to avoid
> object instantiations, see discussion [1]).
>
> Other state backends serialize their data such that the behavior is not
> consistent across backends.
> If we want to have light-weight tests, we have to create new objects in the
> ReduceFunction causing unnecessary overhead.
>
> I would propose to copy objects when storing them in a HeapStateBackend.
> This would ensure that objects returned from state to the user behave
> identical for different state backends.
>
> We created a related JIRA [2] that asks to copy records that go into an
> incremental ReduceFunction. The scope is more narrow and would solve our
> problem, but would leave the inconsistent behavior of state backends in
> place.
>
> What do others think?
>
> Cheers, Fabian
>
> [1] https://github.com/apache/flink/pull/2792#discussion_r88653721
> [2] https://issues.apache.org/jira/browse/FLINK-5105
>