You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2016/03/04 01:54:08 UTC

RE: Guarantees for object reuse modes and documentation

Hi Gábor,

When object re-use is enabled, what happens when maps are chained?

In Cascading, when running in local mode, the tuple that one map operation outputs is immediately used as the input for the next map.

The upstream map isn't called with another input tuple until the pipelining is done, which prevents a problem with output tuple re-use.

Is this the same model used by Flink?

Thanks for clarifying,

-- Ken

> From: Gábor Gévay
> Sent: February 20, 2016 4:04:09am PST
> To: dev@flink.apache.org
> Subject: Re: Guarantees for object reuse modes and documentation
> 
> Thanks, Ken! I was wondering how other systems handle these issues.
> 
> Fortunately, the deep copy - shallow copy problem doesn't arise in
> Flink: when we copy an object, it is always a deep copy (at least, I
> hope so :)).
> 
> Best,
> Gábor
> 
> 
> 
> 2016-02-19 22:29 GMT+01:00 Ken Krugler <kk...@transpac.com>:
>> Not sure how useful this is, but we'd run into similar issues with Cascading over the years.
>> 
>> This wasn't an issue for input data, as Cascading "locks" the Tuple such that attempts to modify it will fail.
>> 
>> And in general Hadoop always re-uses the data container being passed to operations, so you quickly learn to not cache those :)
>> 
>> When trying to re-use a Tuple as the output in an operation, things get a bit more complicated.
>> 
>> If the Tuple only contains primitive types, then there's no issue as the (effectively) shallow copy created by the execution platform doesn't create a problem.
>> 
>> If the Tuple contains an object (e.g. a nested Tuple) then there were situations where a deep copy would need to be made before passing the Tuple to the operation's output collector.
>> 
>> For example, if the next (chained) operation was a map-side aggregator, then a shallow copy of the Tuple would be cached. If there's a non-primitive object then changes to this in the upstream operation obviously bork the cached data.
>> 
>> Net-net is that it we wanted a way to find out, from inside an operation, whether we needed to make a deep copy of the output Tuple. But that doesn't exist (yet), so we have some utility code to check if a deep copy is needed (non-primitive types), and if so then it auto-clones the Tuple. Which isn't very efficient, but for most of our workflows we only have primitive types.
>> 
>> -- Ken
>> 
>>> From: Fabian Hueske
>>> Sent: February 17, 2016 9:17:27am PST
>>> To: dev@flink.apache.org
>>> Subject: Guarantees for object reuse modes and documentation
>>> 
>>> Hi,
>>> 
>>> 
>>> 
>>> Flink's DataSet API features a configuration parameter called
>>> enableObjectReuse(). If activated, Flink's runtime will create fewer
>>> objects which results in better performance and lower garbage collection
>>> overhead. Depending on whether the configuration switch is enabled or not,
>>> user functions may or may not perform certain operations on objects they
>>> receive from Flink or emit to Flink.
>>> 
>>> 
>>> 
>>> At the moment, there are quite a few open issues and discussions going on
>>> about the object reuse mode, including the JIRA issues FLINK-3333,
>>> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>> 
>>> 
>>> 
>>> IMO, the most important issue is FLINK-3333 which is about improving the
>>> documentation of the object reuse mode. The current version [1] is
>>> ambiguous and includes details about operator chaining which are hard to
>>> understand and to reason about for users. Hence it is not very clear which
>>> guarantees Flink gives for objects in user functions under which
>>> conditions. This documentation needs to be improved and I think this should
>>> happen together with the 1.0 release.
>>> 
>>> 
>>> 
>>> Greg and Gabor proposed two new versions:
>>> 
>>> 1. Greg's version [2]  improves and clarifies the current documentation
>>> without significantly changing the semantics. It also discusses operator
>>> chaining, but gives more details.
>>> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>>> independent of operator chaining which I think is a very good idea because
>>> it is not transparent to the user when function chaining happens. Gabor
>>> formulated four questions to answer what users can do with and expect from
>>> objects that they received or emitted from a function. In order to make the
>>> answers to these questions independent of function chaining and still keep
>>> the contracts as defined by the current documentation, we have to default
>>> to rather restrictive rules. For instance, functions must always emit new
>>> object instances in case of disabled object reuse mode. These strict rules
>>> would for example also require DataSourceFunctions to copy all records
>>> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
>>> guarantees make the disableObjectReuse mode harder to use and reason about
>>> than the enableObjectReuse mode whereas the opposite should be the case.
>>> 
>>> 
>>> 
>>> I would like to suggest a third option. Similar as Gabor, I think the rules
>>> should be independent of function chaining and I would like to break it
>>> down into a handful of easy rules. However, I think we should loosen up the
>>> guarantees for user functions under disableObjectReuse mode a bit.
>>> 
>>> Right now, the documentation states that under enableObjectReuse mode,
>>> input objects are not changed across functions calls. Hence users can
>>> remember these objects across functions calls and their value will not
>>> change. I propose to give this guarantee only within functions calls and
>>> only for objects which are not emitted. Hence, this rule only applies for
>>> functions that can consume multiple values through an iterator such as
>>> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
>>> these functions are allowed to remember the values e.g., in a collection,
>>> and their value will not change when the iterator is forwarded. Once the
>>> function call returns, the values might change. Since  functions with
>>> iterators cannot be directly chained, it will be safe to emit the same
>>> object instance several times (hence FLINK-3335 would become invalid).
>>> 
>>> 
>>> 
>>> The difference to the current guarantees is that input objects become
>>> invalid after the function call returned. Since, the disableObjectReuse
>>> mode was mainly introduced to allow for caching objects across iterator
>>> calls within a GroupReduceFunction or CoGroupFunction (not across function
>>> calls), I think this is a reasonable restriction.
>>> 
>>> 
>>> 
>>> tl;dr;
>>> 
>>> If we want to make the documentation of object reuse independent of
>>> chaining we have to
>>> 
>>> - EITHER, give tighter guarantees / be more restrictive than now and update
>>> internals which might lead to performance regression. This would be in-line
>>> with the current documentation but somewhat defeat the purpose of the
>>> disabledObjectReuse mode, IMO.
>>> 
>>> - OR, give weaker guarantees, which breaks with the current documentation,
>>> but would not affect performance or be easier to follow for users, IMO.
>>> 
>>> 
>>> Greg and Gabor, please correct me if I did not get your points right or
>>> missed something.
>>> 
>>> What do others think?
>>> 
>>> 
>>> Fabian
>>> 
>>> 
>>> 
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>>> 
>>> [2]
>>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>>> 
>>> [3]
>>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr






AW: Guarantees for object reuse modes and documentation

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

yes, that’s exactly how it work in Flink as well.

The object reuse move dues not affect how chaining is done.

Best, Fabian



Von: Ken Krugler
Gesendet: Freitag, 4. März 2016 01:54
An: dev@flink.apache.org
Betreff: RE: Guarantees for object reuse modes and documentation

Hi Gábor,

When object re-use is enabled, what happens when maps are chained?

In Cascading, when running in local mode, the tuple that one map operation outputs is immediately used as the input for the next map.

The upstream map isn't called with another input tuple until the pipelining is done, which prevents a problem with output tuple re-use.

Is this the same model used by Flink?

Thanks for clarifying,

-- Ken

> From: Gábor Gévay
> Sent: February 20, 2016 4:04:09am PST
> To: dev@flink.apache.org
> Subject: Re: Guarantees for object reuse modes and documentation
> 
> Thanks, Ken! I was wondering how other systems handle these issues.
> 
> Fortunately, the deep copy - shallow copy problem doesn't arise in
> Flink: when we copy an object, it is always a deep copy (at least, I
> hope so :)).
> 
> Best,
> Gábor
> 
> 
> 
> 2016-02-19 22:29 GMT+01:00 Ken Krugler <kk...@transpac.com>:
>> Not sure how useful this is, but we'd run into similar issues with Cascading over the years.
>> 
>> This wasn't an issue for input data, as Cascading "locks" the Tuple such that attempts to modify it will fail.
>> 
>> And in general Hadoop always re-uses the data container being passed to operations, so you quickly learn to not cache those :)
>> 
>> When trying to re-use a Tuple as the output in an operation, things get a bit more complicated.
>> 
>> If the Tuple only contains primitive types, then there's no issue as the (effectively) shallow copy created by the execution platform doesn't create a problem.
>> 
>> If the Tuple contains an object (e.g. a nested Tuple) then there were situations where a deep copy would need to be made before passing the Tuple to the operation's output collector.
>> 
>> For example, if the next (chained) operation was a map-side aggregator, then a shallow copy of the Tuple would be cached. If there's a non-primitive object then changes to this in the upstream operation obviously bork the cached data.
>> 
>> Net-net is that it we wanted a way to find out, from inside an operation, whether we needed to make a deep copy of the output Tuple. But that doesn't exist (yet), so we have some utility code to check if a deep copy is needed (non-primitive types), and if so then it auto-clones the Tuple. Which isn't very efficient, but for most of our workflows we only have primitive types.
>> 
>> -- Ken
>> 
>>> From: Fabian Hueske
>>> Sent: February 17, 2016 9:17:27am PST
>>> To: dev@flink.apache.org
>>> Subject: Guarantees for object reuse modes and documentation
>>> 
>>> Hi,
>>> 
>>> 
>>> 
>>> Flink's DataSet API features a configuration parameter called
>>> enableObjectReuse(). If activated, Flink's runtime will create fewer
>>> objects which results in better performance and lower garbage collection
>>> overhead. Depending on whether the configuration switch is enabled or not,
>>> user functions may or may not perform certain operations on objects they
>>> receive from Flink or emit to Flink.
>>> 
>>> 
>>> 
>>> At the moment, there are quite a few open issues and discussions going on
>>> about the object reuse mode, including the JIRA issues FLINK-3333,
>>> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>> 
>>> 
>>> 
>>> IMO, the most important issue is FLINK-3333 which is about improving the
>>> documentation of the object reuse mode. The current version [1] is
>>> ambiguous and includes details about operator chaining which are hard to
>>> understand and to reason about for users. Hence it is not very clear which
>>> guarantees Flink gives for objects in user functions under which
>>> conditions. This documentation needs to be improved and I think this should
>>> happen together with the 1.0 release.
>>> 
>>> 
>>> 
>>> Greg and Gabor proposed two new versions:
>>> 
>>> 1. Greg's version [2]  improves and clarifies the current documentation
>>> without significantly changing the semantics. It also discusses operator
>>> chaining, but gives more details.
>>> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>>> independent of operator chaining which I think is a very good idea because
>>> it is not transparent to the user when function chaining happens. Gabor
>>> formulated four questions to answer what users can do with and expect from
>>> objects that they received or emitted from a function. In order to make the
>>> answers to these questions independent of function chaining and still keep
>>> the contracts as defined by the current documentation, we have to default
>>> to rather restrictive rules. For instance, functions must always emit new
>>> object instances in case of disabled object reuse mode. These strict rules
>>> would for example also require DataSourceFunctions to copy all records
>>> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
>>> guarantees make the disableObjectReuse mode harder to use and reason about
>>> than the enableObjectReuse mode whereas the opposite should be the case.
>>> 
>>> 
>>> 
>>> I would like to suggest a third option. Similar as Gabor, I think the rules
>>> should be independent of function chaining and I would like to break it
>>> down into a handful of easy rules. However, I think we should loosen up the
>>> guarantees for user functions under disableObjectReuse mode a bit.
>>> 
>>> Right now, the documentation states that under enableObjectReuse mode,
>>> input objects are not changed across functions calls. Hence users can
>>> remember these objects across functions calls and their value will not
>>> change. I propose to give this guarantee only within functions calls and
>>> only for objects which are not emitted. Hence, this rule only applies for
>>> functions that can consume multiple values through an iterator such as
>>> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
>>> these functions are allowed to remember the values e.g., in a collection,
>>> and their value will not change when the iterator is forwarded. Once the
>>> function call returns, the values might change. Since  functions with
>>> iterators cannot be directly chained, it will be safe to emit the same
>>> object instance several times (hence FLINK-3335 would become invalid).
>>> 
>>> 
>>> 
>>> The difference to the current guarantees is that input objects become
>>> invalid after the function call returned. Since, the disableObjectReuse
>>> mode was mainly introduced to allow for caching objects across iterator
>>> calls within a GroupReduceFunction or CoGroupFunction (not across function
>>> calls), I think this is a reasonable restriction.
>>> 
>>> 
>>> 
>>> tl;dr;
>>> 
>>> If we want to make the documentation of object reuse independent of
>>> chaining we have to
>>> 
>>> - EITHER, give tighter guarantees / be more restrictive than now and update
>>> internals which might lead to performance regression. This would be in-line
>>> with the current documentation but somewhat defeat the purpose of the
>>> disabledObjectReuse mode, IMO.
>>> 
>>> - OR, give weaker guarantees, which breaks with the current documentation,
>>> but would not affect performance or be easier to follow for users, IMO.
>>> 
>>> 
>>> Greg and Gabor, please correct me if I did not get your points right or
>>> missed something.
>>> 
>>> What do others think?
>>> 
>>> 
>>> Fabian
>>> 
>>> 
>>> 
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>>> 
>>> [2]
>>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>>> 
>>> [3]
>>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr