You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2017/06/27 17:21:22 UTC

[DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Hi all!

I would like to propose the following FLIP:

FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982

The FLIP is motivated by the fact that many users run into an unnecessary
kind of performance problem caused by an old design artifact.

The required change should be reasonably small, and would help many users
and Flink's general standing.

Happy to hear thoughts!

Stephan

======================================

FLIP text is below. Pictures with illustrations are only in the Wiki, not
supported on the mailing list.
-------------------------------------------------------------------------------------------------

Motivation

The default behavior of the streaming runtime is to copy every element
between chained operators.

That operation was introduced for “safety” reasons, to avoid the number of
cases where users can create incorrect programs by reusing mutable objects
(a discouraged pattern, but possible). For example when using state
backends that keep the state as objects on heap, reusing mutable objects
can theoretically create cases where the same object is used in multiple
state mappings.

The effect is that many people that try Flink get much lower performance
than they could possibly get. From empirical evidence, almost all users
that I (Stephan) have been in touch with eventually run into this issue
eventually.

There are multiple observations about that design:


   -

   Object copies are extremely costly. While some simple copy virtually for
   free (types reliably detected as immutable are not copied at all), many
   real pipelines use types like Avro, Thrift, JSON, etc, which are very
   expensive to copy.



   -

   Keyed operations currently only occur after shuffles. The operations are
   hence the first in a pipeline and will never have a reused object anyways.
   That means for the most critical operation, this pre-caution is unnecessary.



   -

   The mode is inconsistent with the contract of the DataSet API, which
   does not copy at each step



   -

   To prevent these copies, users can select {{enableObjectReuse()}}, which
   is misleading, since it does not really reuse mutable objects, but avoids
   additional copies.


Proposal

Summary

I propose to change the default behavior of the DataStream runtime to be
the same as the DataSet runtime. That means that new objects are chosen on
every deserialization, and no copies are made as the objects are passed on
along the pipelines.

Details

I propose to drop the execution config flag {{objectReuse}} and instead
introduce an {{ObjectReuseMode}} enumeration with better control of what
should happen. There will be three different types:


   -

   DEFAULT
   -

      This is the default in the DataSet API
      -

      This will become the default in the DataStream API
      -

      This happens in the DataStream API when {{enableObjectReuse()}} is
      activated.



   -

   COPY_PER_OPERATOR
   -

      The current default in the DataStream API



   -

   FULL_REUSE
   -

      This happens in the DataSet API when {{enableObjectReuse()}} is
      chosen.


An illustration of the modes is as follows:

DEFAULT


See here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r

COPY_PER_OPERATOR


See here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks


FULL_REUSE


See here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuFGinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE
New or Changed Public Interfaces

Interfaces changed

The interface of the {{ExecutionConfig}} add the method
{{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
{{enableObjectReuse()}} and {{disableObjectReuse()}}.


Behavior changed

The default object passing behavior changes, meaning that it can affect the
correctness of prior DataStream programs that assume the original
“COPY_PER_OPERATOR” behavior.

Migration Plan and Compatibility

Interfaces

No interface migration path is needed, because the interfaces are not
broken, merely some methods get deprecated.

Behavior change

Variant 1:

   -

   Change the behavior, make it explicit on the release notes that we did
   that and what cases are affected.
   -

   This may actually be feasible, because the cases that are affected are
   quite pathological corner cases that only very bad implementations should
   encounter (see below)


Variant 2:

   -

   When users set the mode, always that mode is used.
   -

   When the mode is not explicitly set, we follow that strategy:
   -

      Change the CLI such that we know when users upgrade existing jobs
      (the savepoint to start from has a version prior to 1.4).
      -

      Use DEFAULT as the default for jobs that do not start from savepoint,
      or that start from savepoint >= 1.4
      -

      Use COPY_PER_OPERATOR as the default for upgraded jobs

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Aljoscha Krettek <al...@apache.org>.
Stephan and I discussed this for a bit and we came to the conclusion that there are actually two different (orthogonal) settings at play here: "object reuse" and "object forwarding behaviour". The former specifies whether we reuse objects when deserialising records from the network, the latter specifies whether to copy elements when forwarding them between operators in a chain.

The "object reuse" setting in ExecutionConfig is currently used for both. In the DataSet API, the setting affects only object reuse, there is never any copying when passing records between operators in a chain. In the DataStream API, the current "object reuse" setting only affects object forwarding behaviour, objects are never reused when deserialising from network and the setting only specifies whether we copy records when passing them between operators in a chain.

We think that we should keep the setting for "object reuse" in the ExecutionConfig but only obey this in the DataSet API. We propose to add a new setting "element forwarding mode". This is ignored in the DataSet API but the DataStream API is changed to now ignore "object reuse" and only obey this new setting. The new setting would (for the time being) be an Enum of FORWARD and COPY. We might, in the future want to extend the DataStream API to also obey the object reuse setting, in order to facilitate unification of the batch and streaming APIs.

What do you think? I would update the FLIP accordingly if no-one objects.

Best,
Aljoscha

> On 12. Jul 2017, at 15:46, Stephan Ewen <se...@apache.org> wrote:
> 
> Thanks for the feedback. Will leave this open for some more days, and adopt
> it as a FLIP, taking Greg's and Aljoscha's comments into account.
> 
> On Sun, Jul 2, 2017 at 10:13 PM, Ufuk Celebi <uc...@apache.org> wrote:
> 
>> Thanks for the write up and illustrations. :-) +1 to do this.
>> 
>> I'm fine with both proposed "changed behaviour" variants, but lean
>> towards option 1: change the default, make the change explicit in the
>> release notes and add a good docs page about configuring object reuse
>> (ideally re-using your illustrations from the FLIP).
>> 
>> I see that option 2 (keep COPY_PER_OPERATOR as default for upgraded
>> jobs if nothing else is configured) is nice in order to prevent any
>> surprises for users upgrading from 1.3 to 1.4. But if I understand it
>> correctly we only postpone the problem to their first 1.4 savepoint +
>> restore at which point the behaviour would still change, right? If the
>> answer is yes, I think that this might be more confusing than simply
>> changing the default (option 1) in the long run.
>> 
>> – Ufuk
>> 
>> 
>> On Sun, Jul 2, 2017 at 6:12 PM, Stephan Ewen <se...@apache.org> wrote:
>>> Thank you for the reply and for the support!
>>> 
>>> @Greg, controlling object reuse on a per-operator base is definitely a
>> good
>>> way to follow up. My first thought would be to keep this proposal slim
>> and
>>> deal with the "default" logic, and have a followup effort to make this
>>> controllable per operator.
>>> 
>>> @Greg When you mention the "surprises" about object reuse in the DataSet
>>> API, what cases and behavior do you have in mind there?
>>> 
>>> Stephan
>>> 
>>> 
>>> On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>> 
>>>> +1 for changing the default if so many people encountered problems with
>>>> serialisation costs.
>>>> 
>>>> The first two modes don’t require any code changes, correct? Only the
>> last
>>>> one would require changes to the stream input processors.
>>>> 
>>>> We should also keep this issue in mind: https://issues.apache.org/
>>>> jira/browse/FLINK-3974 <https://issues.apache.org/
>> jira/browse/FLINK-3974>
>>>> i.e. we always need to make shallow copies of the StreamRecord.
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 27. Jun 2017, at 21:01, Zhenzhong Xu <fl...@gmail.com> wrote:
>>>>> 
>>>>> Stephan,
>>>>> 
>>>>> Fully supporting this FLIP. We originally encountered pretty big
>>>> surprises observing the object copy behavior causing significant
>>>> performance degradation for our massively parallel use case.
>>>>> 
>>>>> In our case, (I think most appropriately SHOULD be the assumptions for
>>>> all streaming use case), is to assume object immutability for all the
>>>> records throughout processing pipeline, as a result, I don't see a need
>> to
>>>> distinguish different object reuse behaviors for different (chained)
>>>> operators (or to the very extreme even the need to support
>>>> COPY_PER_OPERATOR other than we probably have to support for backward
>>>> compatibility). I am also a fan of failing fast if user asserts
>> incorrect
>>>> assumptions.
>>>>> 
>>>>> One feedback on the FLIP-21 itself, I am not very clear on the
>>>> difference between DEFAULT and FULL_REUSE enumeration, aren't them
>> exactly
>>>> the same thing in new proposal? However, the model figures seem to
>> indicate
>>>> they are slightly different? Can you elaborate a bit more?
>>>>> 
>>>>> Z.
>>>>> 
>>>>> 
>>>>> On 2017-06-27 11:14 (-0700), Greg Hogan <code@greghogan.com <mailto:
>>>> code@greghogan.com>> wrote:
>>>>>> Hi Stephan,
>>>>>> 
>>>>>> Would this be an appropriate time to discuss allowing reuse to be a
>>>> per-operator configuration? Object reuse for chained operators has lead
>> to
>>>> considerable surprise for some users of the DataSet API. This came up
>>>> during the rework of the object reuse documentation for the DataSet API.
>>>> With annotations a Function could mark whether input/iterator or
>>>> output/collected objects should be copied or reused.
>>>>>> 
>>>>>> My distant observation is that is is safer to locally assert reuse at
>>>> the operator level than to assume or guarantee the safety of object
>> reuse
>>>> across an entire program. It could also be handy to mix operators
>> receiving
>>>> copyable objects with operators not requiring copyable objects.
>>>>>> 
>>>>>> Greg
>>>>>> 
>>>>>> 
>>>>>>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>>>> 
>>>>>>> Hi all!
>>>>>>> 
>>>>>>> I would like to propose the following FLIP:
>>>>>>> 
>>>>>>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.
>>>> action?pageId=71012982
>>>>>>> 
>>>>>>> The FLIP is motivated by the fact that many users run into an
>>>> unnecessary
>>>>>>> kind of performance problem caused by an old design artifact.
>>>>>>> 
>>>>>>> The required change should be reasonably small, and would help many
>>>> users
>>>>>>> and Flink's general standing.
>>>>>>> 
>>>>>>> Happy to hear thoughts!
>>>>>>> 
>>>>>>> Stephan
>>>>>>> 
>>>>>>> ======================================
>>>>>>> 
>>>>>>> FLIP text is below. Pictures with illustrations are only in the
>> Wiki,
>>>> not
>>>>>>> supported on the mailing list.
>>>>>>> ------------------------------------------------------------
>>>> -------------------------------------
>>>>>>> 
>>>>>>> Motivation
>>>>>>> 
>>>>>>> The default behavior of the streaming runtime is to copy every
>> element
>>>>>>> between chained operators.
>>>>>>> 
>>>>>>> That operation was introduced for “safety† reasons, to avoid the
>>>> number of
>>>>>>> cases where users can create incorrect programs by reusing mutable
>>>> objects
>>>>>>> (a discouraged pattern, but possible). For example when using state
>>>>>>> backends that keep the state as objects on heap, reusing mutable
>>>> objects
>>>>>>> can theoretically create cases where the same object is used in
>>>> multiple
>>>>>>> state mappings.
>>>>>>> 
>>>>>>> The effect is that many people that try Flink get much lower
>>>> performance
>>>>>>> than they could possibly get. From empirical evidence, almost all
>> users
>>>>>>> that I (Stephan) have been in touch with eventually run into this
>> issue
>>>>>>> eventually.
>>>>>>> 
>>>>>>> There are multiple observations about that design:
>>>>>>> 
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> Object copies are extremely costly. While some simple copy
>> virtually
>>>> for
>>>>>>> free (types reliably detected as immutable are not copied at all),
>>>> many
>>>>>>> real pipelines use types like Avro, Thrift, JSON, etc, which are
>> very
>>>>>>> expensive to copy.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> Keyed operations currently only occur after shuffles. The
>> operations
>>>> are
>>>>>>> hence the first in a pipeline and will never have a reused object
>>>> anyways.
>>>>>>> That means for the most critical operation, this pre-caution is
>>>> unnecessary.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> The mode is inconsistent with the contract of the DataSet API,
>> which
>>>>>>> does not copy at each step
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> To prevent these copies, users can select {{enableObjectReuse()}},
>>>> which
>>>>>>> is misleading, since it does not really reuse mutable objects, but
>>>> avoids
>>>>>>> additional copies.
>>>>>>> 
>>>>>>> 
>>>>>>> Proposal
>>>>>>> 
>>>>>>> Summary
>>>>>>> 
>>>>>>> I propose to change the default behavior of the DataStream runtime
>> to
>>>> be
>>>>>>> the same as the DataSet runtime. That means that new objects are
>>>> chosen on
>>>>>>> every deserialization, and no copies are made as the objects are
>>>> passed on
>>>>>>> along the pipelines.
>>>>>>> 
>>>>>>> Details
>>>>>>> 
>>>>>>> I propose to drop the execution config flag {{objectReuse}} and
>> instead
>>>>>>> introduce an {{ObjectReuseMode}} enumeration with better control of
>>>> what
>>>>>>> should happen. There will be three different types:
>>>>>>> 
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> DEFAULT
>>>>>>> -
>>>>>>> 
>>>>>>>    This is the default in the DataSet API
>>>>>>>    -
>>>>>>> 
>>>>>>>    This will become the default in the DataStream API
>>>>>>>    -
>>>>>>> 
>>>>>>>    This happens in the DataStream API when {{enableObjectReuse()}}
>> is
>>>>>>>    activated.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> COPY_PER_OPERATOR
>>>>>>> -
>>>>>>> 
>>>>>>>    The current default in the DataStream API
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> FULL_REUSE
>>>>>>> -
>>>>>>> 
>>>>>>>    This happens in the DataSet API when {{enableObjectReuse()}} is
>>>>>>>    chosen.
>>>>>>> 
>>>>>>> 
>>>>>>> An illustration of the modes is as follows:
>>>>>>> 
>>>>>>> DEFAULT
>>>>>>> 
>>>>>>> 
>>>>>>> See here:
>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.
>>>> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com
>> %
>>>> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
>>>> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
>> <
>>>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&
>>>> preview=/https%3A%2F%2Flh5.googleusercontent.com%
>> 2F1UOpVB2wSMhx8067IE9t2_
>>>> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-
>>>> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
>>>>>>> 
>>>>>>> COPY_PER_OPERATOR
>>>>>>> 
>>>>>>> 
>>>>>>> See here:
>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.
>>>> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com
>> %
>>>> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
>>>> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
>>>> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/
>>>> confluence/pages/viewpage.action?pageId=71012982&
>>>> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-
>>>> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-
>>>> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
>>>>>>> 
>>>>>>> 
>>>>>>> FULL_REUSE
>>>>>>> 
>>>>>>> 
>>>>>>> See here:
>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.
>>>> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com
>> %
>>>> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-
>> fT20B0q7FGDAvAk5oh1h58WtNQktuF
>>>> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <
>>>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&
>>>> preview=/https%3A%2F%2Flh5.googleusercontent.com%
>>>> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-
>> fT20B0q7FGDAvAk5oh1h58WtNQktuF
>>>> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
>>>>>>> New or Changed Public Interfaces
>>>>>>> 
>>>>>>> Interfaces changed
>>>>>>> 
>>>>>>> The interface of the {{ExecutionConfig}} add the method
>>>>>>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
>>>>>>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
>>>>>>> 
>>>>>>> 
>>>>>>> Behavior changed
>>>>>>> 
>>>>>>> The default object passing behavior changes, meaning that it can
>>>> affect the
>>>>>>> correctness of prior DataStream programs that assume the original
>>>>>>> “COPY_PER_OPERATOR† behavior.
>>>>>>> 
>>>>>>> Migration Plan and Compatibility
>>>>>>> 
>>>>>>> Interfaces
>>>>>>> 
>>>>>>> No interface migration path is needed, because the interfaces are
>> not
>>>>>>> broken, merely some methods get deprecated.
>>>>>>> 
>>>>>>> Behavior change
>>>>>>> 
>>>>>>> Variant 1:
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> Change the behavior, make it explicit on the release notes that we
>> did
>>>>>>> that and what cases are affected.
>>>>>>> -
>>>>>>> 
>>>>>>> This may actually be feasible, because the cases that are affected
>> are
>>>>>>> quite pathological corner cases that only very bad implementations
>>>> should
>>>>>>> encounter (see below)
>>>>>>> 
>>>>>>> 
>>>>>>> Variant 2:
>>>>>>> 
>>>>>>> -
>>>>>>> 
>>>>>>> When users set the mode, always that mode is used.
>>>>>>> -
>>>>>>> 
>>>>>>> When the mode is not explicitly set, we follow that strategy:
>>>>>>> -
>>>>>>> 
>>>>>>>    Change the CLI such that we know when users upgrade existing
>> jobs
>>>>>>>    (the savepoint to start from has a version prior to 1.4).
>>>>>>>    -
>>>>>>> 
>>>>>>>    Use DEFAULT as the default for jobs that do not start from
>>>> savepoint,
>>>>>>>    or that start from savepoint >= 1.4
>>>>>>>    -
>>>>>>> 
>>>>>>>    Use COPY_PER_OPERATOR as the default for upgraded jobs
>>>> 
>>>> 
>> 


Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Stephan Ewen <se...@apache.org>.
Thanks for the feedback. Will leave this open for some more days, and adopt
it as a FLIP, taking Greg's and Aljoscha's comments into account.

On Sun, Jul 2, 2017 at 10:13 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Thanks for the write up and illustrations. :-) +1 to do this.
>
> I'm fine with both proposed "changed behaviour" variants, but lean
> towards option 1: change the default, make the change explicit in the
> release notes and add a good docs page about configuring object reuse
> (ideally re-using your illustrations from the FLIP).
>
> I see that option 2 (keep COPY_PER_OPERATOR as default for upgraded
> jobs if nothing else is configured) is nice in order to prevent any
> surprises for users upgrading from 1.3 to 1.4. But if I understand it
> correctly we only postpone the problem to their first 1.4 savepoint +
> restore at which point the behaviour would still change, right? If the
> answer is yes, I think that this might be more confusing than simply
> changing the default (option 1) in the long run.
>
> – Ufuk
>
>
> On Sun, Jul 2, 2017 at 6:12 PM, Stephan Ewen <se...@apache.org> wrote:
> > Thank you for the reply and for the support!
> >
> > @Greg, controlling object reuse on a per-operator base is definitely a
> good
> > way to follow up. My first thought would be to keep this proposal slim
> and
> > deal with the "default" logic, and have a followup effort to make this
> > controllable per operator.
> >
> > @Greg When you mention the "surprises" about object reuse in the DataSet
> > API, what cases and behavior do you have in mind there?
> >
> > Stephan
> >
> >
> > On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> +1 for changing the default if so many people encountered problems with
> >> serialisation costs.
> >>
> >> The first two modes don’t require any code changes, correct? Only the
> last
> >> one would require changes to the stream input processors.
> >>
> >> We should also keep this issue in mind: https://issues.apache.org/
> >> jira/browse/FLINK-3974 <https://issues.apache.org/
> jira/browse/FLINK-3974>
> >> i.e. we always need to make shallow copies of the StreamRecord.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <fl...@gmail.com> wrote:
> >> >
> >> > Stephan,
> >> >
> >> > Fully supporting this FLIP. We originally encountered pretty big
> >> surprises observing the object copy behavior causing significant
> >> performance degradation for our massively parallel use case.
> >> >
> >> > In our case, (I think most appropriately SHOULD be the assumptions for
> >> all streaming use case), is to assume object immutability for all the
> >> records throughout processing pipeline, as a result, I don't see a need
> to
> >> distinguish different object reuse behaviors for different (chained)
> >> operators (or to the very extreme even the need to support
> >> COPY_PER_OPERATOR other than we probably have to support for backward
> >> compatibility). I am also a fan of failing fast if user asserts
> incorrect
> >> assumptions.
> >> >
> >> > One feedback on the FLIP-21 itself, I am not very clear on the
> >> difference between DEFAULT and FULL_REUSE enumeration, aren't them
> exactly
> >> the same thing in new proposal? However, the model figures seem to
> indicate
> >> they are slightly different? Can you elaborate a bit more?
> >> >
> >> > Z.
> >> >
> >> >
> >> > On 2017-06-27 11:14 (-0700), Greg Hogan <code@greghogan.com <mailto:
> >> code@greghogan.com>> wrote:
> >> >> Hi Stephan,
> >> >>
> >> >> Would this be an appropriate time to discuss allowing reuse to be a
> >> per-operator configuration? Object reuse for chained operators has lead
> to
> >> considerable surprise for some users of the DataSet API. This came up
> >> during the rework of the object reuse documentation for the DataSet API.
> >> With annotations a Function could mark whether input/iterator or
> >> output/collected objects should be copied or reused.
> >> >>
> >> >> My distant observation is that is is safer to locally assert reuse at
> >> the operator level than to assume or guarantee the safety of object
> reuse
> >> across an entire program. It could also be handy to mix operators
> receiving
> >> copyable objects with operators not requiring copyable objects.
> >> >>
> >> >> Greg
> >> >>
> >> >>
> >> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> >> >>>
> >> >>> Hi all!
> >> >>>
> >> >>> I would like to propose the following FLIP:
> >> >>>
> >> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982
> >> >>>
> >> >>> The FLIP is motivated by the fact that many users run into an
> >> unnecessary
> >> >>> kind of performance problem caused by an old design artifact.
> >> >>>
> >> >>> The required change should be reasonably small, and would help many
> >> users
> >> >>> and Flink's general standing.
> >> >>>
> >> >>> Happy to hear thoughts!
> >> >>>
> >> >>> Stephan
> >> >>>
> >> >>> ======================================
> >> >>>
> >> >>> FLIP text is below. Pictures with illustrations are only in the
> Wiki,
> >> not
> >> >>> supported on the mailing list.
> >> >>> ------------------------------------------------------------
> >> -------------------------------------
> >> >>>
> >> >>> Motivation
> >> >>>
> >> >>> The default behavior of the streaming runtime is to copy every
> element
> >> >>> between chained operators.
> >> >>>
> >> >>> That operation was introduced for “safety† reasons, to avoid the
> >> number of
> >> >>> cases where users can create incorrect programs by reusing mutable
> >> objects
> >> >>> (a discouraged pattern, but possible). For example when using state
> >> >>> backends that keep the state as objects on heap, reusing mutable
> >> objects
> >> >>> can theoretically create cases where the same object is used in
> >> multiple
> >> >>> state mappings.
> >> >>>
> >> >>> The effect is that many people that try Flink get much lower
> >> performance
> >> >>> than they could possibly get. From empirical evidence, almost all
> users
> >> >>> that I (Stephan) have been in touch with eventually run into this
> issue
> >> >>> eventually.
> >> >>>
> >> >>> There are multiple observations about that design:
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  Object copies are extremely costly. While some simple copy
> virtually
> >> for
> >> >>>  free (types reliably detected as immutable are not copied at all),
> >> many
> >> >>>  real pipelines use types like Avro, Thrift, JSON, etc, which are
> very
> >> >>>  expensive to copy.
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  Keyed operations currently only occur after shuffles. The
> operations
> >> are
> >> >>>  hence the first in a pipeline and will never have a reused object
> >> anyways.
> >> >>>  That means for the most critical operation, this pre-caution is
> >> unnecessary.
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  The mode is inconsistent with the contract of the DataSet API,
> which
> >> >>>  does not copy at each step
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  To prevent these copies, users can select {{enableObjectReuse()}},
> >> which
> >> >>>  is misleading, since it does not really reuse mutable objects, but
> >> avoids
> >> >>>  additional copies.
> >> >>>
> >> >>>
> >> >>> Proposal
> >> >>>
> >> >>> Summary
> >> >>>
> >> >>> I propose to change the default behavior of the DataStream runtime
> to
> >> be
> >> >>> the same as the DataSet runtime. That means that new objects are
> >> chosen on
> >> >>> every deserialization, and no copies are made as the objects are
> >> passed on
> >> >>> along the pipelines.
> >> >>>
> >> >>> Details
> >> >>>
> >> >>> I propose to drop the execution config flag {{objectReuse}} and
> instead
> >> >>> introduce an {{ObjectReuseMode}} enumeration with better control of
> >> what
> >> >>> should happen. There will be three different types:
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  DEFAULT
> >> >>>  -
> >> >>>
> >> >>>     This is the default in the DataSet API
> >> >>>     -
> >> >>>
> >> >>>     This will become the default in the DataStream API
> >> >>>     -
> >> >>>
> >> >>>     This happens in the DataStream API when {{enableObjectReuse()}}
> is
> >> >>>     activated.
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  COPY_PER_OPERATOR
> >> >>>  -
> >> >>>
> >> >>>     The current default in the DataStream API
> >> >>>
> >> >>>
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  FULL_REUSE
> >> >>>  -
> >> >>>
> >> >>>     This happens in the DataSet API when {{enableObjectReuse()}} is
> >> >>>     chosen.
> >> >>>
> >> >>>
> >> >>> An illustration of the modes is as follows:
> >> >>>
> >> >>> DEFAULT
> >> >>>
> >> >>>
> >> >>> See here:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com
> %
> >> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
> >> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
> <
> >> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&
> >> preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2F1UOpVB2wSMhx8067IE9t2_
> >> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-
> >> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
> >> >>>
> >> >>> COPY_PER_OPERATOR
> >> >>>
> >> >>>
> >> >>> See here:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com
> %
> >> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
> >> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
> >> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/
> >> confluence/pages/viewpage.action?pageId=71012982&
> >> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-
> >> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-
> >> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
> >> >>>
> >> >>>
> >> >>> FULL_REUSE
> >> >>>
> >> >>>
> >> >>> See here:
> >> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com
> %
> >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-
> fT20B0q7FGDAvAk5oh1h58WtNQktuF
> >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <
> >> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&
> >> preview=/https%3A%2F%2Flh5.googleusercontent.com%
> >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-
> fT20B0q7FGDAvAk5oh1h58WtNQktuF
> >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
> >> >>> New or Changed Public Interfaces
> >> >>>
> >> >>> Interfaces changed
> >> >>>
> >> >>> The interface of the {{ExecutionConfig}} add the method
> >> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> >> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> >> >>>
> >> >>>
> >> >>> Behavior changed
> >> >>>
> >> >>> The default object passing behavior changes, meaning that it can
> >> affect the
> >> >>> correctness of prior DataStream programs that assume the original
> >> >>> “COPY_PER_OPERATOR† behavior.
> >> >>>
> >> >>> Migration Plan and Compatibility
> >> >>>
> >> >>> Interfaces
> >> >>>
> >> >>> No interface migration path is needed, because the interfaces are
> not
> >> >>> broken, merely some methods get deprecated.
> >> >>>
> >> >>> Behavior change
> >> >>>
> >> >>> Variant 1:
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  Change the behavior, make it explicit on the release notes that we
> did
> >> >>>  that and what cases are affected.
> >> >>>  -
> >> >>>
> >> >>>  This may actually be feasible, because the cases that are affected
> are
> >> >>>  quite pathological corner cases that only very bad implementations
> >> should
> >> >>>  encounter (see below)
> >> >>>
> >> >>>
> >> >>> Variant 2:
> >> >>>
> >> >>>  -
> >> >>>
> >> >>>  When users set the mode, always that mode is used.
> >> >>>  -
> >> >>>
> >> >>>  When the mode is not explicitly set, we follow that strategy:
> >> >>>  -
> >> >>>
> >> >>>     Change the CLI such that we know when users upgrade existing
> jobs
> >> >>>     (the savepoint to start from has a version prior to 1.4).
> >> >>>     -
> >> >>>
> >> >>>     Use DEFAULT as the default for jobs that do not start from
> >> savepoint,
> >> >>>     or that start from savepoint >= 1.4
> >> >>>     -
> >> >>>
> >> >>>     Use COPY_PER_OPERATOR as the default for upgraded jobs
> >>
> >>
>

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Ufuk Celebi <uc...@apache.org>.
Thanks for the write up and illustrations. :-) +1 to do this.

I'm fine with both proposed "changed behaviour" variants, but lean
towards option 1: change the default, make the change explicit in the
release notes and add a good docs page about configuring object reuse
(ideally re-using your illustrations from the FLIP).

I see that option 2 (keep COPY_PER_OPERATOR as default for upgraded
jobs if nothing else is configured) is nice in order to prevent any
surprises for users upgrading from 1.3 to 1.4. But if I understand it
correctly we only postpone the problem to their first 1.4 savepoint +
restore at which point the behaviour would still change, right? If the
answer is yes, I think that this might be more confusing than simply
changing the default (option 1) in the long run.

– Ufuk


On Sun, Jul 2, 2017 at 6:12 PM, Stephan Ewen <se...@apache.org> wrote:
> Thank you for the reply and for the support!
>
> @Greg, controlling object reuse on a per-operator base is definitely a good
> way to follow up. My first thought would be to keep this proposal slim and
> deal with the "default" logic, and have a followup effort to make this
> controllable per operator.
>
> @Greg When you mention the "surprises" about object reuse in the DataSet
> API, what cases and behavior do you have in mind there?
>
> Stephan
>
>
> On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> +1 for changing the default if so many people encountered problems with
>> serialisation costs.
>>
>> The first two modes don’t require any code changes, correct? Only the last
>> one would require changes to the stream input processors.
>>
>> We should also keep this issue in mind: https://issues.apache.org/
>> jira/browse/FLINK-3974 <https://issues.apache.org/jira/browse/FLINK-3974>
>> i.e. we always need to make shallow copies of the StreamRecord.
>>
>> Best,
>> Aljoscha
>>
>> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <fl...@gmail.com> wrote:
>> >
>> > Stephan,
>> >
>> > Fully supporting this FLIP. We originally encountered pretty big
>> surprises observing the object copy behavior causing significant
>> performance degradation for our massively parallel use case.
>> >
>> > In our case, (I think most appropriately SHOULD be the assumptions for
>> all streaming use case), is to assume object immutability for all the
>> records throughout processing pipeline, as a result, I don't see a need to
>> distinguish different object reuse behaviors for different (chained)
>> operators (or to the very extreme even the need to support
>> COPY_PER_OPERATOR other than we probably have to support for backward
>> compatibility). I am also a fan of failing fast if user asserts incorrect
>> assumptions.
>> >
>> > One feedback on the FLIP-21 itself, I am not very clear on the
>> difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly
>> the same thing in new proposal? However, the model figures seem to indicate
>> they are slightly different? Can you elaborate a bit more?
>> >
>> > Z.
>> >
>> >
>> > On 2017-06-27 11:14 (-0700), Greg Hogan <code@greghogan.com <mailto:
>> code@greghogan.com>> wrote:
>> >> Hi Stephan,
>> >>
>> >> Would this be an appropriate time to discuss allowing reuse to be a
>> per-operator configuration? Object reuse for chained operators has lead to
>> considerable surprise for some users of the DataSet API. This came up
>> during the rework of the object reuse documentation for the DataSet API.
>> With annotations a Function could mark whether input/iterator or
>> output/collected objects should be copied or reused.
>> >>
>> >> My distant observation is that is is safer to locally assert reuse at
>> the operator level than to assume or guarantee the safety of object reuse
>> across an entire program. It could also be handy to mix operators receiving
>> copyable objects with operators not requiring copyable objects.
>> >>
>> >> Greg
>> >>
>> >>
>> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
>> >>>
>> >>> Hi all!
>> >>>
>> >>> I would like to propose the following FLIP:
>> >>>
>> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982
>> >>>
>> >>> The FLIP is motivated by the fact that many users run into an
>> unnecessary
>> >>> kind of performance problem caused by an old design artifact.
>> >>>
>> >>> The required change should be reasonably small, and would help many
>> users
>> >>> and Flink's general standing.
>> >>>
>> >>> Happy to hear thoughts!
>> >>>
>> >>> Stephan
>> >>>
>> >>> ======================================
>> >>>
>> >>> FLIP text is below. Pictures with illustrations are only in the Wiki,
>> not
>> >>> supported on the mailing list.
>> >>> ------------------------------------------------------------
>> -------------------------------------
>> >>>
>> >>> Motivation
>> >>>
>> >>> The default behavior of the streaming runtime is to copy every element
>> >>> between chained operators.
>> >>>
>> >>> That operation was introduced for “safety† reasons, to avoid the
>> number of
>> >>> cases where users can create incorrect programs by reusing mutable
>> objects
>> >>> (a discouraged pattern, but possible). For example when using state
>> >>> backends that keep the state as objects on heap, reusing mutable
>> objects
>> >>> can theoretically create cases where the same object is used in
>> multiple
>> >>> state mappings.
>> >>>
>> >>> The effect is that many people that try Flink get much lower
>> performance
>> >>> than they could possibly get. From empirical evidence, almost all users
>> >>> that I (Stephan) have been in touch with eventually run into this issue
>> >>> eventually.
>> >>>
>> >>> There are multiple observations about that design:
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  Object copies are extremely costly. While some simple copy virtually
>> for
>> >>>  free (types reliably detected as immutable are not copied at all),
>> many
>> >>>  real pipelines use types like Avro, Thrift, JSON, etc, which are very
>> >>>  expensive to copy.
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  Keyed operations currently only occur after shuffles. The operations
>> are
>> >>>  hence the first in a pipeline and will never have a reused object
>> anyways.
>> >>>  That means for the most critical operation, this pre-caution is
>> unnecessary.
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  The mode is inconsistent with the contract of the DataSet API, which
>> >>>  does not copy at each step
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  To prevent these copies, users can select {{enableObjectReuse()}},
>> which
>> >>>  is misleading, since it does not really reuse mutable objects, but
>> avoids
>> >>>  additional copies.
>> >>>
>> >>>
>> >>> Proposal
>> >>>
>> >>> Summary
>> >>>
>> >>> I propose to change the default behavior of the DataStream runtime to
>> be
>> >>> the same as the DataSet runtime. That means that new objects are
>> chosen on
>> >>> every deserialization, and no copies are made as the objects are
>> passed on
>> >>> along the pipelines.
>> >>>
>> >>> Details
>> >>>
>> >>> I propose to drop the execution config flag {{objectReuse}} and instead
>> >>> introduce an {{ObjectReuseMode}} enumeration with better control of
>> what
>> >>> should happen. There will be three different types:
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  DEFAULT
>> >>>  -
>> >>>
>> >>>     This is the default in the DataSet API
>> >>>     -
>> >>>
>> >>>     This will become the default in the DataStream API
>> >>>     -
>> >>>
>> >>>     This happens in the DataStream API when {{enableObjectReuse()}} is
>> >>>     activated.
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  COPY_PER_OPERATOR
>> >>>  -
>> >>>
>> >>>     The current default in the DataStream API
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  FULL_REUSE
>> >>>  -
>> >>>
>> >>>     This happens in the DataSet API when {{enableObjectReuse()}} is
>> >>>     chosen.
>> >>>
>> >>>
>> >>> An illustration of the modes is as follows:
>> >>>
>> >>> DEFAULT
>> >>>
>> >>>
>> >>> See here:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
>> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
>> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&
>> preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_
>> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-
>> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
>> >>>
>> >>> COPY_PER_OPERATOR
>> >>>
>> >>>
>> >>> See here:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%
>> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
>> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
>> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/
>> confluence/pages/viewpage.action?pageId=71012982&
>> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-
>> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-
>> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
>> >>>
>> >>>
>> >>> FULL_REUSE
>> >>>
>> >>>
>> >>> See here:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
>> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
>> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&
>> preview=/https%3A%2F%2Flh5.googleusercontent.com%
>> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
>> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
>> >>> New or Changed Public Interfaces
>> >>>
>> >>> Interfaces changed
>> >>>
>> >>> The interface of the {{ExecutionConfig}} add the method
>> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
>> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
>> >>>
>> >>>
>> >>> Behavior changed
>> >>>
>> >>> The default object passing behavior changes, meaning that it can
>> affect the
>> >>> correctness of prior DataStream programs that assume the original
>> >>> “COPY_PER_OPERATOR† behavior.
>> >>>
>> >>> Migration Plan and Compatibility
>> >>>
>> >>> Interfaces
>> >>>
>> >>> No interface migration path is needed, because the interfaces are not
>> >>> broken, merely some methods get deprecated.
>> >>>
>> >>> Behavior change
>> >>>
>> >>> Variant 1:
>> >>>
>> >>>  -
>> >>>
>> >>>  Change the behavior, make it explicit on the release notes that we did
>> >>>  that and what cases are affected.
>> >>>  -
>> >>>
>> >>>  This may actually be feasible, because the cases that are affected are
>> >>>  quite pathological corner cases that only very bad implementations
>> should
>> >>>  encounter (see below)
>> >>>
>> >>>
>> >>> Variant 2:
>> >>>
>> >>>  -
>> >>>
>> >>>  When users set the mode, always that mode is used.
>> >>>  -
>> >>>
>> >>>  When the mode is not explicitly set, we follow that strategy:
>> >>>  -
>> >>>
>> >>>     Change the CLI such that we know when users upgrade existing jobs
>> >>>     (the savepoint to start from has a version prior to 1.4).
>> >>>     -
>> >>>
>> >>>     Use DEFAULT as the default for jobs that do not start from
>> savepoint,
>> >>>     or that start from savepoint >= 1.4
>> >>>     -
>> >>>
>> >>>     Use COPY_PER_OPERATOR as the default for upgraded jobs
>>
>>

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Stephan Ewen <se...@apache.org>.
Thank you for the reply and for the support!

@Greg, controlling object reuse on a per-operator base is definitely a good
way to follow up. My first thought would be to keep this proposal slim and
deal with the "default" logic, and have a followup effort to make this
controllable per operator.

@Greg When you mention the "surprises" about object reuse in the DataSet
API, what cases and behavior do you have in mind there?

Stephan


On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> +1 for changing the default if so many people encountered problems with
> serialisation costs.
>
> The first two modes don’t require any code changes, correct? Only the last
> one would require changes to the stream input processors.
>
> We should also keep this issue in mind: https://issues.apache.org/
> jira/browse/FLINK-3974 <https://issues.apache.org/jira/browse/FLINK-3974>
> i.e. we always need to make shallow copies of the StreamRecord.
>
> Best,
> Aljoscha
>
> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <fl...@gmail.com> wrote:
> >
> > Stephan,
> >
> > Fully supporting this FLIP. We originally encountered pretty big
> surprises observing the object copy behavior causing significant
> performance degradation for our massively parallel use case.
> >
> > In our case, (I think most appropriately SHOULD be the assumptions for
> all streaming use case), is to assume object immutability for all the
> records throughout processing pipeline, as a result, I don't see a need to
> distinguish different object reuse behaviors for different (chained)
> operators (or to the very extreme even the need to support
> COPY_PER_OPERATOR other than we probably have to support for backward
> compatibility). I am also a fan of failing fast if user asserts incorrect
> assumptions.
> >
> > One feedback on the FLIP-21 itself, I am not very clear on the
> difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly
> the same thing in new proposal? However, the model figures seem to indicate
> they are slightly different? Can you elaborate a bit more?
> >
> > Z.
> >
> >
> > On 2017-06-27 11:14 (-0700), Greg Hogan <code@greghogan.com <mailto:
> code@greghogan.com>> wrote:
> >> Hi Stephan,
> >>
> >> Would this be an appropriate time to discuss allowing reuse to be a
> per-operator configuration? Object reuse for chained operators has lead to
> considerable surprise for some users of the DataSet API. This came up
> during the rework of the object reuse documentation for the DataSet API.
> With annotations a Function could mark whether input/iterator or
> output/collected objects should be copied or reused.
> >>
> >> My distant observation is that is is safer to locally assert reuse at
> the operator level than to assume or guarantee the safety of object reuse
> across an entire program. It could also be handy to mix operators receiving
> copyable objects with operators not requiring copyable objects.
> >>
> >> Greg
> >>
> >>
> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> >>>
> >>> Hi all!
> >>>
> >>> I would like to propose the following FLIP:
> >>>
> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982
> >>>
> >>> The FLIP is motivated by the fact that many users run into an
> unnecessary
> >>> kind of performance problem caused by an old design artifact.
> >>>
> >>> The required change should be reasonably small, and would help many
> users
> >>> and Flink's general standing.
> >>>
> >>> Happy to hear thoughts!
> >>>
> >>> Stephan
> >>>
> >>> ======================================
> >>>
> >>> FLIP text is below. Pictures with illustrations are only in the Wiki,
> not
> >>> supported on the mailing list.
> >>> ------------------------------------------------------------
> -------------------------------------
> >>>
> >>> Motivation
> >>>
> >>> The default behavior of the streaming runtime is to copy every element
> >>> between chained operators.
> >>>
> >>> That operation was introduced for “safety† reasons, to avoid the
> number of
> >>> cases where users can create incorrect programs by reusing mutable
> objects
> >>> (a discouraged pattern, but possible). For example when using state
> >>> backends that keep the state as objects on heap, reusing mutable
> objects
> >>> can theoretically create cases where the same object is used in
> multiple
> >>> state mappings.
> >>>
> >>> The effect is that many people that try Flink get much lower
> performance
> >>> than they could possibly get. From empirical evidence, almost all users
> >>> that I (Stephan) have been in touch with eventually run into this issue
> >>> eventually.
> >>>
> >>> There are multiple observations about that design:
> >>>
> >>>
> >>>  -
> >>>
> >>>  Object copies are extremely costly. While some simple copy virtually
> for
> >>>  free (types reliably detected as immutable are not copied at all),
> many
> >>>  real pipelines use types like Avro, Thrift, JSON, etc, which are very
> >>>  expensive to copy.
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  Keyed operations currently only occur after shuffles. The operations
> are
> >>>  hence the first in a pipeline and will never have a reused object
> anyways.
> >>>  That means for the most critical operation, this pre-caution is
> unnecessary.
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  The mode is inconsistent with the contract of the DataSet API, which
> >>>  does not copy at each step
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  To prevent these copies, users can select {{enableObjectReuse()}},
> which
> >>>  is misleading, since it does not really reuse mutable objects, but
> avoids
> >>>  additional copies.
> >>>
> >>>
> >>> Proposal
> >>>
> >>> Summary
> >>>
> >>> I propose to change the default behavior of the DataStream runtime to
> be
> >>> the same as the DataSet runtime. That means that new objects are
> chosen on
> >>> every deserialization, and no copies are made as the objects are
> passed on
> >>> along the pipelines.
> >>>
> >>> Details
> >>>
> >>> I propose to drop the execution config flag {{objectReuse}} and instead
> >>> introduce an {{ObjectReuseMode}} enumeration with better control of
> what
> >>> should happen. There will be three different types:
> >>>
> >>>
> >>>  -
> >>>
> >>>  DEFAULT
> >>>  -
> >>>
> >>>     This is the default in the DataSet API
> >>>     -
> >>>
> >>>     This will become the default in the DataStream API
> >>>     -
> >>>
> >>>     This happens in the DataStream API when {{enableObjectReuse()}} is
> >>>     activated.
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  COPY_PER_OPERATOR
> >>>  -
> >>>
> >>>     The current default in the DataStream API
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  FULL_REUSE
> >>>  -
> >>>
> >>>     This happens in the DataSet API when {{enableObjectReuse()}} is
> >>>     chosen.
> >>>
> >>>
> >>> An illustration of the modes is as follows:
> >>>
> >>> DEFAULT
> >>>
> >>>
> >>> See here:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&
> preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_
> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-
> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
> >>>
> >>> COPY_PER_OPERATOR
> >>>
> >>>
> >>> See here:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%
> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/
> confluence/pages/viewpage.action?pageId=71012982&
> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-
> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-
> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
> >>>
> >>>
> >>> FULL_REUSE
> >>>
> >>>
> >>> See here:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&
> preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
> >>> New or Changed Public Interfaces
> >>>
> >>> Interfaces changed
> >>>
> >>> The interface of the {{ExecutionConfig}} add the method
> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> >>>
> >>>
> >>> Behavior changed
> >>>
> >>> The default object passing behavior changes, meaning that it can
> affect the
> >>> correctness of prior DataStream programs that assume the original
> >>> “COPY_PER_OPERATOR† behavior.
> >>>
> >>> Migration Plan and Compatibility
> >>>
> >>> Interfaces
> >>>
> >>> No interface migration path is needed, because the interfaces are not
> >>> broken, merely some methods get deprecated.
> >>>
> >>> Behavior change
> >>>
> >>> Variant 1:
> >>>
> >>>  -
> >>>
> >>>  Change the behavior, make it explicit on the release notes that we did
> >>>  that and what cases are affected.
> >>>  -
> >>>
> >>>  This may actually be feasible, because the cases that are affected are
> >>>  quite pathological corner cases that only very bad implementations
> should
> >>>  encounter (see below)
> >>>
> >>>
> >>> Variant 2:
> >>>
> >>>  -
> >>>
> >>>  When users set the mode, always that mode is used.
> >>>  -
> >>>
> >>>  When the mode is not explicitly set, we follow that strategy:
> >>>  -
> >>>
> >>>     Change the CLI such that we know when users upgrade existing jobs
> >>>     (the savepoint to start from has a version prior to 1.4).
> >>>     -
> >>>
> >>>     Use DEFAULT as the default for jobs that do not start from
> savepoint,
> >>>     or that start from savepoint >= 1.4
> >>>     -
> >>>
> >>>     Use COPY_PER_OPERATOR as the default for upgraded jobs
>
>

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Aljoscha Krettek <al...@apache.org>.
+1 for changing the default if so many people encountered problems with serialisation costs.

The first two modes don’t require any code changes, correct? Only the last one would require changes to the stream input processors.

We should also keep this issue in mind: https://issues.apache.org/jira/browse/FLINK-3974 <https://issues.apache.org/jira/browse/FLINK-3974> i.e. we always need to make shallow copies of the StreamRecord.

Best,
Aljoscha

> On 27. Jun 2017, at 21:01, Zhenzhong Xu <fl...@gmail.com> wrote:
> 
> Stephan,
> 
> Fully supporting this FLIP. We originally encountered pretty big surprises observing the object copy behavior causing significant performance degradation for our massively parallel use case. 
> 
> In our case, (I think most appropriately SHOULD be the assumptions for all streaming use case), is to assume object immutability for all the records throughout processing pipeline, as a result, I don't see a need to distinguish different object reuse behaviors for different (chained) operators (or to the very extreme even the need to support COPY_PER_OPERATOR other than we probably have to support for backward compatibility). I am also a fan of failing fast if user asserts incorrect assumptions.
> 
> One feedback on the FLIP-21 itself, I am not very clear on the difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly the same thing in new proposal? However, the model figures seem to indicate they are slightly different? Can you elaborate a bit more?
> 
> Z. 
> 
> 
> On 2017-06-27 11:14 (-0700), Greg Hogan <code@greghogan.com <ma...@greghogan.com>> wrote: 
>> Hi Stephan,
>> 
>> Would this be an appropriate time to discuss allowing reuse to be a per-operator configuration? Object reuse for chained operators has lead to considerable surprise for some users of the DataSet API. This came up during the rework of the object reuse documentation for the DataSet API. With annotations a Function could mark whether input/iterator or output/collected objects should be copied or reused.
>> 
>> My distant observation is that is is safer to locally assert reuse at the operator level than to assume or guarantee the safety of object reuse across an entire program. It could also be handy to mix operators receiving copyable objects with operators not requiring copyable objects.
>> 
>> Greg
>> 
>> 
>>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
>>> 
>>> Hi all!
>>> 
>>> I would like to propose the following FLIP:
>>> 
>>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>>> 
>>> The FLIP is motivated by the fact that many users run into an unnecessary
>>> kind of performance problem caused by an old design artifact.
>>> 
>>> The required change should be reasonably small, and would help many users
>>> and Flink's general standing.
>>> 
>>> Happy to hear thoughts!
>>> 
>>> Stephan
>>> 
>>> ======================================
>>> 
>>> FLIP text is below. Pictures with illustrations are only in the Wiki, not
>>> supported on the mailing list.
>>> -------------------------------------------------------------------------------------------------
>>> 
>>> Motivation
>>> 
>>> The default behavior of the streaming runtime is to copy every element
>>> between chained operators.
>>> 
>>> That operation was introduced for “safety” reasons, to avoid the number of
>>> cases where users can create incorrect programs by reusing mutable objects
>>> (a discouraged pattern, but possible). For example when using state
>>> backends that keep the state as objects on heap, reusing mutable objects
>>> can theoretically create cases where the same object is used in multiple
>>> state mappings.
>>> 
>>> The effect is that many people that try Flink get much lower performance
>>> than they could possibly get. From empirical evidence, almost all users
>>> that I (Stephan) have been in touch with eventually run into this issue
>>> eventually.
>>> 
>>> There are multiple observations about that design:
>>> 
>>> 
>>>  -
>>> 
>>>  Object copies are extremely costly. While some simple copy virtually for
>>>  free (types reliably detected as immutable are not copied at all), many
>>>  real pipelines use types like Avro, Thrift, JSON, etc, which are very
>>>  expensive to copy.
>>> 
>>> 
>>> 
>>>  -
>>> 
>>>  Keyed operations currently only occur after shuffles. The operations are
>>>  hence the first in a pipeline and will never have a reused object anyways.
>>>  That means for the most critical operation, this pre-caution is unnecessary.
>>> 
>>> 
>>> 
>>>  -
>>> 
>>>  The mode is inconsistent with the contract of the DataSet API, which
>>>  does not copy at each step
>>> 
>>> 
>>> 
>>>  -
>>> 
>>>  To prevent these copies, users can select {{enableObjectReuse()}}, which
>>>  is misleading, since it does not really reuse mutable objects, but avoids
>>>  additional copies.
>>> 
>>> 
>>> Proposal
>>> 
>>> Summary
>>> 
>>> I propose to change the default behavior of the DataStream runtime to be
>>> the same as the DataSet runtime. That means that new objects are chosen on
>>> every deserialization, and no copies are made as the objects are passed on
>>> along the pipelines.
>>> 
>>> Details
>>> 
>>> I propose to drop the execution config flag {{objectReuse}} and instead
>>> introduce an {{ObjectReuseMode}} enumeration with better control of what
>>> should happen. There will be three different types:
>>> 
>>> 
>>>  -
>>> 
>>>  DEFAULT
>>>  -
>>> 
>>>     This is the default in the DataSet API
>>>     -
>>> 
>>>     This will become the default in the DataStream API
>>>     -
>>> 
>>>     This happens in the DataStream API when {{enableObjectReuse()}} is
>>>     activated.
>>> 
>>> 
>>> 
>>>  -
>>> 
>>>  COPY_PER_OPERATOR
>>>  -
>>> 
>>>     The current default in the DataStream API
>>> 
>>> 
>>> 
>>>  -
>>> 
>>>  FULL_REUSE
>>>  -
>>> 
>>>     This happens in the DataSet API when {{enableObjectReuse()}} is
>>>     chosen.
>>> 
>>> 
>>> An illustration of the modes is as follows:
>>> 
>>> DEFAULT
>>> 
>>> 
>>> See here:
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
>>> 
>>> COPY_PER_OPERATOR
>>> 
>>> 
>>> See here:
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
>>> 
>>> 
>>> FULL_REUSE
>>> 
>>> 
>>> See here:
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuFGinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuFGinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
>>> New or Changed Public Interfaces
>>> 
>>> Interfaces changed
>>> 
>>> The interface of the {{ExecutionConfig}} add the method
>>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
>>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
>>> 
>>> 
>>> Behavior changed
>>> 
>>> The default object passing behavior changes, meaning that it can affect the
>>> correctness of prior DataStream programs that assume the original
>>> “COPY_PER_OPERATOR” behavior.
>>> 
>>> Migration Plan and Compatibility
>>> 
>>> Interfaces
>>> 
>>> No interface migration path is needed, because the interfaces are not
>>> broken, merely some methods get deprecated.
>>> 
>>> Behavior change
>>> 
>>> Variant 1:
>>> 
>>>  -
>>> 
>>>  Change the behavior, make it explicit on the release notes that we did
>>>  that and what cases are affected.
>>>  -
>>> 
>>>  This may actually be feasible, because the cases that are affected are
>>>  quite pathological corner cases that only very bad implementations should
>>>  encounter (see below)
>>> 
>>> 
>>> Variant 2:
>>> 
>>>  -
>>> 
>>>  When users set the mode, always that mode is used.
>>>  -
>>> 
>>>  When the mode is not explicitly set, we follow that strategy:
>>>  -
>>> 
>>>     Change the CLI such that we know when users upgrade existing jobs
>>>     (the savepoint to start from has a version prior to 1.4).
>>>     -
>>> 
>>>     Use DEFAULT as the default for jobs that do not start from savepoint,
>>>     or that start from savepoint >= 1.4
>>>     -
>>> 
>>>     Use COPY_PER_OPERATOR as the default for upgraded jobs


Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Stephan Ewen <se...@apache.org>.
Hi Zhenzhong!

The difference is as follows:

DEFAULT means that at the beginning of a chain, an object is created per
record, and that object travels through the chain. The total number of
instantiated objects is as many as records, but only one lives at the same
time.

FULL_REUSE is only applicable to mutable objects (say POJOs and tuples with
mutable types) and means the following: one object is created at the very
beginning of the execution, and that same object is reused in the
deserialization and pushed through the pipeline. Only a single object is
ever instantiated during the program execution. The mode relieves stress of
the GC, but requires extra care (as using and reusing mutable objects does
in every type of programming).

Does that answer the question?

Stephan


On Tue, Jun 27, 2017 at 9:01 PM, Zhenzhong Xu <fl...@gmail.com> wrote:

> Stephan,
>
> Fully supporting this FLIP. We originally encountered pretty big surprises
> observing the object copy behavior causing significant performance
> degradation for our massively parallel use case.
>
> In our case, (I think most appropriately SHOULD be the assumptions for all
> streaming use case), is to assume object immutability for all the records
> throughout processing pipeline, as a result, I don't see a need to
> distinguish different object reuse behaviors for different (chained)
> operators (or to the very extreme even the need to support
> COPY_PER_OPERATOR other than we probably have to support for backward
> compatibility). I am also a fan of failing fast if user asserts incorrect
> assumptions.
>
> One feedback on the FLIP-21 itself, I am not very clear on the difference
> between DEFAULT and FULL_REUSE enumeration, aren't them exactly the same
> thing in new proposal? However, the model figures seem to indicate they are
> slightly different? Can you elaborate a bit more?
>
> Z.
>
>
> On 2017-06-27 11:14 (-0700), Greg Hogan <co...@greghogan.com> wrote:
> > Hi Stephan,
> >
> > Would this be an appropriate time to discuss allowing reuse to be a
> per-operator configuration? Object reuse for chained operators has lead to
> considerable surprise for some users of the DataSet API. This came up
> during the rework of the object reuse documentation for the DataSet API.
> With annotations a Function could mark whether input/iterator or
> output/collected objects should be copied or reused.
> >
> > My distant observation is that is is safer to locally assert reuse at
> the operator level than to assume or guarantee the safety of object reuse
> across an entire program. It could also be handy to mix operators receiving
> copyable objects with operators not requiring copyable objects.
> >
> > Greg
> >
> >
> > > On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> > >
> > > Hi all!
> > >
> > > I would like to propose the following FLIP:
> > >
> > > FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982
> > >
> > > The FLIP is motivated by the fact that many users run into an
> unnecessary
> > > kind of performance problem caused by an old design artifact.
> > >
> > > The required change should be reasonably small, and would help many
> users
> > > and Flink's general standing.
> > >
> > > Happy to hear thoughts!
> > >
> > > Stephan
> > >
> > > ======================================
> > >
> > > FLIP text is below. Pictures with illustrations are only in the Wiki,
> not
> > > supported on the mailing list.
> > > ------------------------------------------------------------
> -------------------------------------
> > >
> > > Motivation
> > >
> > > The default behavior of the streaming runtime is to copy every element
> > > between chained operators.
> > >
> > > That operation was introduced for “safety” reasons, to avoid the
> number of
> > > cases where users can create incorrect programs by reusing mutable
> objects
> > > (a discouraged pattern, but possible). For example when using state
> > > backends that keep the state as objects on heap, reusing mutable
> objects
> > > can theoretically create cases where the same object is used in
> multiple
> > > state mappings.
> > >
> > > The effect is that many people that try Flink get much lower
> performance
> > > than they could possibly get. From empirical evidence, almost all users
> > > that I (Stephan) have been in touch with eventually run into this issue
> > > eventually.
> > >
> > > There are multiple observations about that design:
> > >
> > >
> > >   -
> > >
> > >   Object copies are extremely costly. While some simple copy virtually
> for
> > >   free (types reliably detected as immutable are not copied at all),
> many
> > >   real pipelines use types like Avro, Thrift, JSON, etc, which are very
> > >   expensive to copy.
> > >
> > >
> > >
> > >   -
> > >
> > >   Keyed operations currently only occur after shuffles. The operations
> are
> > >   hence the first in a pipeline and will never have a reused object
> anyways.
> > >   That means for the most critical operation, this pre-caution is
> unnecessary.
> > >
> > >
> > >
> > >   -
> > >
> > >   The mode is inconsistent with the contract of the DataSet API, which
> > >   does not copy at each step
> > >
> > >
> > >
> > >   -
> > >
> > >   To prevent these copies, users can select {{enableObjectReuse()}},
> which
> > >   is misleading, since it does not really reuse mutable objects, but
> avoids
> > >   additional copies.
> > >
> > >
> > > Proposal
> > >
> > > Summary
> > >
> > > I propose to change the default behavior of the DataStream runtime to
> be
> > > the same as the DataSet runtime. That means that new objects are
> chosen on
> > > every deserialization, and no copies are made as the objects are
> passed on
> > > along the pipelines.
> > >
> > > Details
> > >
> > > I propose to drop the execution config flag {{objectReuse}} and instead
> > > introduce an {{ObjectReuseMode}} enumeration with better control of
> what
> > > should happen. There will be three different types:
> > >
> > >
> > >   -
> > >
> > >   DEFAULT
> > >   -
> > >
> > >      This is the default in the DataSet API
> > >      -
> > >
> > >      This will become the default in the DataStream API
> > >      -
> > >
> > >      This happens in the DataStream API when {{enableObjectReuse()}} is
> > >      activated.
> > >
> > >
> > >
> > >   -
> > >
> > >   COPY_PER_OPERATOR
> > >   -
> > >
> > >      The current default in the DataStream API
> > >
> > >
> > >
> > >   -
> > >
> > >   FULL_REUSE
> > >   -
> > >
> > >      This happens in the DataSet API when {{enableObjectReuse()}} is
> > >      chosen.
> > >
> > >
> > > An illustration of the modes is as follows:
> > >
> > > DEFAULT
> > >
> > >
> > > See here:
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
> > >
> > > COPY_PER_OPERATOR
> > >
> > >
> > > See here:
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%
> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks
> > >
> > >
> > > FULL_REUSE
> > >
> > >
> > > See here:
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE
> > > New or Changed Public Interfaces
> > >
> > > Interfaces changed
> > >
> > > The interface of the {{ExecutionConfig}} add the method
> > > {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> > > {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> > >
> > >
> > > Behavior changed
> > >
> > > The default object passing behavior changes, meaning that it can
> affect the
> > > correctness of prior DataStream programs that assume the original
> > > “COPY_PER_OPERATOR” behavior.
> > >
> > > Migration Plan and Compatibility
> > >
> > > Interfaces
> > >
> > > No interface migration path is needed, because the interfaces are not
> > > broken, merely some methods get deprecated.
> > >
> > > Behavior change
> > >
> > > Variant 1:
> > >
> > >   -
> > >
> > >   Change the behavior, make it explicit on the release notes that we
> did
> > >   that and what cases are affected.
> > >   -
> > >
> > >   This may actually be feasible, because the cases that are affected
> are
> > >   quite pathological corner cases that only very bad implementations
> should
> > >   encounter (see below)
> > >
> > >
> > > Variant 2:
> > >
> > >   -
> > >
> > >   When users set the mode, always that mode is used.
> > >   -
> > >
> > >   When the mode is not explicitly set, we follow that strategy:
> > >   -
> > >
> > >      Change the CLI such that we know when users upgrade existing jobs
> > >      (the savepoint to start from has a version prior to 1.4).
> > >      -
> > >
> > >      Use DEFAULT as the default for jobs that do not start from
> savepoint,
> > >      or that start from savepoint >= 1.4
> > >      -
> > >
> > >      Use COPY_PER_OPERATOR as the default for upgraded jobs
> >
> >
>

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Zhenzhong Xu <fl...@gmail.com>.
Stephan,

Fully supporting this FLIP. We originally encountered pretty big surprises observing the object copy behavior causing significant performance degradation for our massively parallel use case. 

In our case, (I think most appropriately SHOULD be the assumptions for all streaming use case), is to assume object immutability for all the records throughout processing pipeline, as a result, I don't see a need to distinguish different object reuse behaviors for different (chained) operators (or to the very extreme even the need to support COPY_PER_OPERATOR other than we probably have to support for backward compatibility). I am also a fan of failing fast if user asserts incorrect assumptions.

One feedback on the FLIP-21 itself, I am not very clear on the difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly the same thing in new proposal? However, the model figures seem to indicate they are slightly different? Can you elaborate a bit more?

Z. 


On 2017-06-27 11:14 (-0700), Greg Hogan <co...@greghogan.com> wrote: 
> Hi Stephan,
> 
> Would this be an appropriate time to discuss allowing reuse to be a per-operator configuration? Object reuse for chained operators has lead to considerable surprise for some users of the DataSet API. This came up during the rework of the object reuse documentation for the DataSet API. With annotations a Function could mark whether input/iterator or output/collected objects should be copied or reused.
> 
> My distant observation is that is is safer to locally assert reuse at the operator level than to assume or guarantee the safety of object reuse across an entire program. It could also be handy to mix operators receiving copyable objects with operators not requiring copyable objects.
> 
> Greg
> 
> 
> > On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> > 
> > Hi all!
> > 
> > I would like to propose the following FLIP:
> > 
> > FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
> > 
> > The FLIP is motivated by the fact that many users run into an unnecessary
> > kind of performance problem caused by an old design artifact.
> > 
> > The required change should be reasonably small, and would help many users
> > and Flink's general standing.
> > 
> > Happy to hear thoughts!
> > 
> > Stephan
> > 
> > ======================================
> > 
> > FLIP text is below. Pictures with illustrations are only in the Wiki, not
> > supported on the mailing list.
> > -------------------------------------------------------------------------------------------------
> > 
> > Motivation
> > 
> > The default behavior of the streaming runtime is to copy every element
> > between chained operators.
> > 
> > That operation was introduced for “safety” reasons, to avoid the number of
> > cases where users can create incorrect programs by reusing mutable objects
> > (a discouraged pattern, but possible). For example when using state
> > backends that keep the state as objects on heap, reusing mutable objects
> > can theoretically create cases where the same object is used in multiple
> > state mappings.
> > 
> > The effect is that many people that try Flink get much lower performance
> > than they could possibly get. From empirical evidence, almost all users
> > that I (Stephan) have been in touch with eventually run into this issue
> > eventually.
> > 
> > There are multiple observations about that design:
> > 
> > 
> >   -
> > 
> >   Object copies are extremely costly. While some simple copy virtually for
> >   free (types reliably detected as immutable are not copied at all), many
> >   real pipelines use types like Avro, Thrift, JSON, etc, which are very
> >   expensive to copy.
> > 
> > 
> > 
> >   -
> > 
> >   Keyed operations currently only occur after shuffles. The operations are
> >   hence the first in a pipeline and will never have a reused object anyways.
> >   That means for the most critical operation, this pre-caution is unnecessary.
> > 
> > 
> > 
> >   -
> > 
> >   The mode is inconsistent with the contract of the DataSet API, which
> >   does not copy at each step
> > 
> > 
> > 
> >   -
> > 
> >   To prevent these copies, users can select {{enableObjectReuse()}}, which
> >   is misleading, since it does not really reuse mutable objects, but avoids
> >   additional copies.
> > 
> > 
> > Proposal
> > 
> > Summary
> > 
> > I propose to change the default behavior of the DataStream runtime to be
> > the same as the DataSet runtime. That means that new objects are chosen on
> > every deserialization, and no copies are made as the objects are passed on
> > along the pipelines.
> > 
> > Details
> > 
> > I propose to drop the execution config flag {{objectReuse}} and instead
> > introduce an {{ObjectReuseMode}} enumeration with better control of what
> > should happen. There will be three different types:
> > 
> > 
> >   -
> > 
> >   DEFAULT
> >   -
> > 
> >      This is the default in the DataSet API
> >      -
> > 
> >      This will become the default in the DataStream API
> >      -
> > 
> >      This happens in the DataStream API when {{enableObjectReuse()}} is
> >      activated.
> > 
> > 
> > 
> >   -
> > 
> >   COPY_PER_OPERATOR
> >   -
> > 
> >      The current default in the DataStream API
> > 
> > 
> > 
> >   -
> > 
> >   FULL_REUSE
> >   -
> > 
> >      This happens in the DataSet API when {{enableObjectReuse()}} is
> >      chosen.
> > 
> > 
> > An illustration of the modes is as follows:
> > 
> > DEFAULT
> > 
> > 
> > See here:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
> > 
> > COPY_PER_OPERATOR
> > 
> > 
> > See here:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks
> > 
> > 
> > FULL_REUSE
> > 
> > 
> > See here:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuFGinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE
> > New or Changed Public Interfaces
> > 
> > Interfaces changed
> > 
> > The interface of the {{ExecutionConfig}} add the method
> > {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> > {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> > 
> > 
> > Behavior changed
> > 
> > The default object passing behavior changes, meaning that it can affect the
> > correctness of prior DataStream programs that assume the original
> > “COPY_PER_OPERATOR” behavior.
> > 
> > Migration Plan and Compatibility
> > 
> > Interfaces
> > 
> > No interface migration path is needed, because the interfaces are not
> > broken, merely some methods get deprecated.
> > 
> > Behavior change
> > 
> > Variant 1:
> > 
> >   -
> > 
> >   Change the behavior, make it explicit on the release notes that we did
> >   that and what cases are affected.
> >   -
> > 
> >   This may actually be feasible, because the cases that are affected are
> >   quite pathological corner cases that only very bad implementations should
> >   encounter (see below)
> > 
> > 
> > Variant 2:
> > 
> >   -
> > 
> >   When users set the mode, always that mode is used.
> >   -
> > 
> >   When the mode is not explicitly set, we follow that strategy:
> >   -
> > 
> >      Change the CLI such that we know when users upgrade existing jobs
> >      (the savepoint to start from has a version prior to 1.4).
> >      -
> > 
> >      Use DEFAULT as the default for jobs that do not start from savepoint,
> >      or that start from savepoint >= 1.4
> >      -
> > 
> >      Use COPY_PER_OPERATOR as the default for upgraded jobs
> 
> 

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

Posted by Greg Hogan <co...@greghogan.com>.
Hi Stephan,

Would this be an appropriate time to discuss allowing reuse to be a per-operator configuration? Object reuse for chained operators has lead to considerable surprise for some users of the DataSet API. This came up during the rework of the object reuse documentation for the DataSet API. With annotations a Function could mark whether input/iterator or output/collected objects should be copied or reused.

My distant observation is that is is safer to locally assert reuse at the operator level than to assume or guarantee the safety of object reuse across an entire program. It could also be handy to mix operators receiving copyable objects with operators not requiring copyable objects.

Greg


> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi all!
> 
> I would like to propose the following FLIP:
> 
> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
> 
> The FLIP is motivated by the fact that many users run into an unnecessary
> kind of performance problem caused by an old design artifact.
> 
> The required change should be reasonably small, and would help many users
> and Flink's general standing.
> 
> Happy to hear thoughts!
> 
> Stephan
> 
> ======================================
> 
> FLIP text is below. Pictures with illustrations are only in the Wiki, not
> supported on the mailing list.
> -------------------------------------------------------------------------------------------------
> 
> Motivation
> 
> The default behavior of the streaming runtime is to copy every element
> between chained operators.
> 
> That operation was introduced for “safety” reasons, to avoid the number of
> cases where users can create incorrect programs by reusing mutable objects
> (a discouraged pattern, but possible). For example when using state
> backends that keep the state as objects on heap, reusing mutable objects
> can theoretically create cases where the same object is used in multiple
> state mappings.
> 
> The effect is that many people that try Flink get much lower performance
> than they could possibly get. From empirical evidence, almost all users
> that I (Stephan) have been in touch with eventually run into this issue
> eventually.
> 
> There are multiple observations about that design:
> 
> 
>   -
> 
>   Object copies are extremely costly. While some simple copy virtually for
>   free (types reliably detected as immutable are not copied at all), many
>   real pipelines use types like Avro, Thrift, JSON, etc, which are very
>   expensive to copy.
> 
> 
> 
>   -
> 
>   Keyed operations currently only occur after shuffles. The operations are
>   hence the first in a pipeline and will never have a reused object anyways.
>   That means for the most critical operation, this pre-caution is unnecessary.
> 
> 
> 
>   -
> 
>   The mode is inconsistent with the contract of the DataSet API, which
>   does not copy at each step
> 
> 
> 
>   -
> 
>   To prevent these copies, users can select {{enableObjectReuse()}}, which
>   is misleading, since it does not really reuse mutable objects, but avoids
>   additional copies.
> 
> 
> Proposal
> 
> Summary
> 
> I propose to change the default behavior of the DataStream runtime to be
> the same as the DataSet runtime. That means that new objects are chosen on
> every deserialization, and no copies are made as the objects are passed on
> along the pipelines.
> 
> Details
> 
> I propose to drop the execution config flag {{objectReuse}} and instead
> introduce an {{ObjectReuseMode}} enumeration with better control of what
> should happen. There will be three different types:
> 
> 
>   -
> 
>   DEFAULT
>   -
> 
>      This is the default in the DataSet API
>      -
> 
>      This will become the default in the DataStream API
>      -
> 
>      This happens in the DataStream API when {{enableObjectReuse()}} is
>      activated.
> 
> 
> 
>   -
> 
>   COPY_PER_OPERATOR
>   -
> 
>      The current default in the DataStream API
> 
> 
> 
>   -
> 
>   FULL_REUSE
>   -
> 
>      This happens in the DataSet API when {{enableObjectReuse()}} is
>      chosen.
> 
> 
> An illustration of the modes is as follows:
> 
> DEFAULT
> 
> 
> See here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
> 
> COPY_PER_OPERATOR
> 
> 
> See here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks
> 
> 
> FULL_REUSE
> 
> 
> See here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuFGinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE
> New or Changed Public Interfaces
> 
> Interfaces changed
> 
> The interface of the {{ExecutionConfig}} add the method
> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> 
> 
> Behavior changed
> 
> The default object passing behavior changes, meaning that it can affect the
> correctness of prior DataStream programs that assume the original
> “COPY_PER_OPERATOR” behavior.
> 
> Migration Plan and Compatibility
> 
> Interfaces
> 
> No interface migration path is needed, because the interfaces are not
> broken, merely some methods get deprecated.
> 
> Behavior change
> 
> Variant 1:
> 
>   -
> 
>   Change the behavior, make it explicit on the release notes that we did
>   that and what cases are affected.
>   -
> 
>   This may actually be feasible, because the cases that are affected are
>   quite pathological corner cases that only very bad implementations should
>   encounter (see below)
> 
> 
> Variant 2:
> 
>   -
> 
>   When users set the mode, always that mode is used.
>   -
> 
>   When the mode is not explicitly set, we follow that strategy:
>   -
> 
>      Change the CLI such that we know when users upgrade existing jobs
>      (the savepoint to start from has a version prior to 1.4).
>      -
> 
>      Use DEFAULT as the default for jobs that do not start from savepoint,
>      or that start from savepoint >= 1.4
>      -
> 
>      Use COPY_PER_OPERATOR as the default for upgraded jobs