You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2015/10/02 17:53:49 UTC

Rethink the "always copy" policy for streaming topologies

Hi all!

Now that we are coming to the next release, I wanted to make sure we
finalize the decision on that point, because it would be nice to not break
the behavior of system afterwards.

Right now, when tasks are chained together, the system copies the elements
always between different tasks in the same chain.

I think this policy was established under the assumption that copies do not
cost anything, given our own test examples, which mainly use immutable
types like Strings, boxed primitives, ..

In practice, a lot of data types are actually quite expensive to copy.

For example, a rather common data type in the event analysis of web-sources
is JSON Object.
Flink treats this as a generic type. Depending on its concrete
implementation, Kryo may have perform a serialization copy, which means
encoding into bytes (JSON encoding, charset encoding) and decoding again.

This has a massive impact on the out-of-the-box performance of the system.
Given that, I was wondering whether we should set to default policy to "not
copying".

That is basically the behavior of the batch API, and there has so far never
been an issue with that (people running into the trap of overwritten
mutable elements).

What do you think?

Stephan

Re: Rethink the "always copy" policy for streaming topologies

Posted by Stephan Ewen <se...@apache.org>.
I don't recall that the default policy was changed.

If we change it, would be a good idea to change it for 0.10 - the latest
for 1.0

One thing I realized is that to get predictable behavior with chaining, we
should not do the special case parallelism 1 chaining (meaning shuffle
operations get chained when both sender and receiver have parallelism 1).
This causes different chaining behavior with different parallelism - can be
an easy source of confusion when debugging a program. Parallelism 1 with
repartitioning operators is probably mostly a debug setup anyways.

On Sat, Oct 24, 2015 at 6:35 PM, Gyula Fóra <gy...@gmail.com> wrote:

> Hey guys,
>
> Have we disabled the default input copying after all? I don't remember
> seeing a Jira or PR for this (maybe I just missed it).
>
> And if not, do we want this in the 0.10 release?
>
> Cheers,
> Gyula
>
> On Fri, Oct 2, 2015 at 7:57 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Do we know what kind of impact the non-reuse policy has? Maybe the
> > serialization overhead is subsumed by other effects.
> >
> > But in general I'm ok with changing the default to non copying. We just
> > have to document this feature properly.
> > On Oct 2, 2015 6:31 PM, "Maximilian Michels" <mx...@apache.org> wrote:
> >
> > > +1 Good idea. I think we can save quite some CPU cycles by not copying
> > > records.
> > >
> > > That is basically the behavior of the batch API, and there has so far
> > never
> > > > been an issue with that (people running into the trap of overwritten
> > > > mutable elements).
> > >
> > >
> > > As far as I know, this is only the case for chained operators?
> > >
> > > On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > > > +1 for disable copy by default
> > > >
> > > >
> > > > On 10/02/2015 05:53 PM, Stephan Ewen wrote:
> > > > > Hi all!
> > > > >
> > > > > Now that we are coming to the next release, I wanted to make sure
> we
> > > > > finalize the decision on that point, because it would be nice to
> not
> > > > break
> > > > > the behavior of system afterwards.
> > > > >
> > > > > Right now, when tasks are chained together, the system copies the
> > > > elements
> > > > > always between different tasks in the same chain.
> > > > >
> > > > > I think this policy was established under the assumption that
> copies
> > do
> > > > not
> > > > > cost anything, given our own test examples, which mainly use
> > immutable
> > > > > types like Strings, boxed primitives, ..
> > > > >
> > > > > In practice, a lot of data types are actually quite expensive to
> > copy.
> > > > >
> > > > > For example, a rather common data type in the event analysis of
> > > > web-sources
> > > > > is JSON Object.
> > > > > Flink treats this as a generic type. Depending on its concrete
> > > > > implementation, Kryo may have perform a serialization copy, which
> > means
> > > > > encoding into bytes (JSON encoding, charset encoding) and decoding
> > > again.
> > > > >
> > > > > This has a massive impact on the out-of-the-box performance of the
> > > > system.
> > > > > Given that, I was wondering whether we should set to default policy
> > to
> > > > "not
> > > > > copying".
> > > > >
> > > > > That is basically the behavior of the batch API, and there has so
> far
> > > > never
> > > > > been an issue with that (people running into the trap of
> overwritten
> > > > > mutable elements).
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Stephan
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Rethink the "always copy" policy for streaming topologies

Posted by Gyula Fóra <gy...@gmail.com>.
Hey guys,

Have we disabled the default input copying after all? I don't remember
seeing a Jira or PR for this (maybe I just missed it).

And if not, do we want this in the 0.10 release?

Cheers,
Gyula

On Fri, Oct 2, 2015 at 7:57 PM, Till Rohrmann <tr...@apache.org> wrote:

> Do we know what kind of impact the non-reuse policy has? Maybe the
> serialization overhead is subsumed by other effects.
>
> But in general I'm ok with changing the default to non copying. We just
> have to document this feature properly.
> On Oct 2, 2015 6:31 PM, "Maximilian Michels" <mx...@apache.org> wrote:
>
> > +1 Good idea. I think we can save quite some CPU cycles by not copying
> > records.
> >
> > That is basically the behavior of the batch API, and there has so far
> never
> > > been an issue with that (people running into the trap of overwritten
> > > mutable elements).
> >
> >
> > As far as I know, this is only the case for chained operators?
> >
> > On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > +1 for disable copy by default
> > >
> > >
> > > On 10/02/2015 05:53 PM, Stephan Ewen wrote:
> > > > Hi all!
> > > >
> > > > Now that we are coming to the next release, I wanted to make sure we
> > > > finalize the decision on that point, because it would be nice to not
> > > break
> > > > the behavior of system afterwards.
> > > >
> > > > Right now, when tasks are chained together, the system copies the
> > > elements
> > > > always between different tasks in the same chain.
> > > >
> > > > I think this policy was established under the assumption that copies
> do
> > > not
> > > > cost anything, given our own test examples, which mainly use
> immutable
> > > > types like Strings, boxed primitives, ..
> > > >
> > > > In practice, a lot of data types are actually quite expensive to
> copy.
> > > >
> > > > For example, a rather common data type in the event analysis of
> > > web-sources
> > > > is JSON Object.
> > > > Flink treats this as a generic type. Depending on its concrete
> > > > implementation, Kryo may have perform a serialization copy, which
> means
> > > > encoding into bytes (JSON encoding, charset encoding) and decoding
> > again.
> > > >
> > > > This has a massive impact on the out-of-the-box performance of the
> > > system.
> > > > Given that, I was wondering whether we should set to default policy
> to
> > > "not
> > > > copying".
> > > >
> > > > That is basically the behavior of the batch API, and there has so far
> > > never
> > > > been an issue with that (people running into the trap of overwritten
> > > > mutable elements).
> > > >
> > > > What do you think?
> > > >
> > > > Stephan
> > > >
> > >
> > >
> >
>

Re: Rethink the "always copy" policy for streaming topologies

Posted by Till Rohrmann <tr...@apache.org>.
Do we know what kind of impact the non-reuse policy has? Maybe the
serialization overhead is subsumed by other effects.

But in general I'm ok with changing the default to non copying. We just
have to document this feature properly.
On Oct 2, 2015 6:31 PM, "Maximilian Michels" <mx...@apache.org> wrote:

> +1 Good idea. I think we can save quite some CPU cycles by not copying
> records.
>
> That is basically the behavior of the batch API, and there has so far never
> > been an issue with that (people running into the trap of overwritten
> > mutable elements).
>
>
> As far as I know, this is only the case for chained operators?
>
> On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <mj...@apache.org> wrote:
>
> > +1 for disable copy by default
> >
> >
> > On 10/02/2015 05:53 PM, Stephan Ewen wrote:
> > > Hi all!
> > >
> > > Now that we are coming to the next release, I wanted to make sure we
> > > finalize the decision on that point, because it would be nice to not
> > break
> > > the behavior of system afterwards.
> > >
> > > Right now, when tasks are chained together, the system copies the
> > elements
> > > always between different tasks in the same chain.
> > >
> > > I think this policy was established under the assumption that copies do
> > not
> > > cost anything, given our own test examples, which mainly use immutable
> > > types like Strings, boxed primitives, ..
> > >
> > > In practice, a lot of data types are actually quite expensive to copy.
> > >
> > > For example, a rather common data type in the event analysis of
> > web-sources
> > > is JSON Object.
> > > Flink treats this as a generic type. Depending on its concrete
> > > implementation, Kryo may have perform a serialization copy, which means
> > > encoding into bytes (JSON encoding, charset encoding) and decoding
> again.
> > >
> > > This has a massive impact on the out-of-the-box performance of the
> > system.
> > > Given that, I was wondering whether we should set to default policy to
> > "not
> > > copying".
> > >
> > > That is basically the behavior of the batch API, and there has so far
> > never
> > > been an issue with that (people running into the trap of overwritten
> > > mutable elements).
> > >
> > > What do you think?
> > >
> > > Stephan
> > >
> >
> >
>

Re: Rethink the "always copy" policy for streaming topologies

Posted by Maximilian Michels <mx...@apache.org>.
+1 Good idea. I think we can save quite some CPU cycles by not copying
records.

That is basically the behavior of the batch API, and there has so far never
> been an issue with that (people running into the trap of overwritten
> mutable elements).


As far as I know, this is only the case for chained operators?

On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <mj...@apache.org> wrote:

> +1 for disable copy by default
>
>
> On 10/02/2015 05:53 PM, Stephan Ewen wrote:
> > Hi all!
> >
> > Now that we are coming to the next release, I wanted to make sure we
> > finalize the decision on that point, because it would be nice to not
> break
> > the behavior of system afterwards.
> >
> > Right now, when tasks are chained together, the system copies the
> elements
> > always between different tasks in the same chain.
> >
> > I think this policy was established under the assumption that copies do
> not
> > cost anything, given our own test examples, which mainly use immutable
> > types like Strings, boxed primitives, ..
> >
> > In practice, a lot of data types are actually quite expensive to copy.
> >
> > For example, a rather common data type in the event analysis of
> web-sources
> > is JSON Object.
> > Flink treats this as a generic type. Depending on its concrete
> > implementation, Kryo may have perform a serialization copy, which means
> > encoding into bytes (JSON encoding, charset encoding) and decoding again.
> >
> > This has a massive impact on the out-of-the-box performance of the
> system.
> > Given that, I was wondering whether we should set to default policy to
> "not
> > copying".
> >
> > That is basically the behavior of the batch API, and there has so far
> never
> > been an issue with that (people running into the trap of overwritten
> > mutable elements).
> >
> > What do you think?
> >
> > Stephan
> >
>
>

Re: Rethink the "always copy" policy for streaming topologies

Posted by "Matthias J. Sax" <mj...@apache.org>.
+1 for disable copy by default


On 10/02/2015 05:53 PM, Stephan Ewen wrote:
> Hi all!
> 
> Now that we are coming to the next release, I wanted to make sure we
> finalize the decision on that point, because it would be nice to not break
> the behavior of system afterwards.
> 
> Right now, when tasks are chained together, the system copies the elements
> always between different tasks in the same chain.
> 
> I think this policy was established under the assumption that copies do not
> cost anything, given our own test examples, which mainly use immutable
> types like Strings, boxed primitives, ..
> 
> In practice, a lot of data types are actually quite expensive to copy.
> 
> For example, a rather common data type in the event analysis of web-sources
> is JSON Object.
> Flink treats this as a generic type. Depending on its concrete
> implementation, Kryo may have perform a serialization copy, which means
> encoding into bytes (JSON encoding, charset encoding) and decoding again.
> 
> This has a massive impact on the out-of-the-box performance of the system.
> Given that, I was wondering whether we should set to default policy to "not
> copying".
> 
> That is basically the behavior of the batch API, and there has so far never
> been an issue with that (people running into the trap of overwritten
> mutable elements).
> 
> What do you think?
> 
> Stephan
> 


Re: Rethink the "always copy" policy for streaming topologies

Posted by Stephan Ewen <se...@apache.org>.
@Martin:

I think you were a user of the Batch API before we made the non-reuse mode
the default mode.
By now, when you use a GroupReduceFunction or a MapPartitionFunction or so,
you need not do any cloning or copying. All functions that receive groups
will always get fresh elements.

This chaining issue concerns only MapFunction (and FlatMapFunction) where
users keep lists to remember elements across invokations to the MapFunction.


On Fri, Oct 2, 2015 at 6:27 PM, Martin Neumann <mn...@sics.se> wrote:

> It seems like I'm one of the few people that run into the mutable elements
> trap on the Batch API from time to time. At the moment I always clone when
> I'm not 100% sure to avoid hunting the bugs later. So far I was happy to
> learn that this is not a problem in Streaming, but that's just me.
>
> When working with groupby and partition functions, its easy to forget that
> there is one class per operator not per partition. So if you write your
> code in the state of mind that each partition is separate object reduce on
> operator level becomes really annoying.
> Since the mapping between partitions and operators is usually hidden, makes
> the debugging harder especially in cases where the test data produces a
> single partition per operator and the real deployment does not.
>
> *To summarize:*
> I'm not against reusing objects as long as there is something that helps
> ease the pitfalls. This could be coding guidelines, debugging tools or best
> practices.
>
>
> On Fri, Oct 2, 2015 at 5:53 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi all!
> >
> > Now that we are coming to the next release, I wanted to make sure we
> > finalize the decision on that point, because it would be nice to not
> break
> > the behavior of system afterwards.
> >
> > Right now, when tasks are chained together, the system copies the
> elements
> > always between different tasks in the same chain.
> >
> > I think this policy was established under the assumption that copies do
> not
> > cost anything, given our own test examples, which mainly use immutable
> > types like Strings, boxed primitives, ..
> >
> > In practice, a lot of data types are actually quite expensive to copy.
> >
> > For example, a rather common data type in the event analysis of
> web-sources
> > is JSON Object.
> > Flink treats this as a generic type. Depending on its concrete
> > implementation, Kryo may have perform a serialization copy, which means
> > encoding into bytes (JSON encoding, charset encoding) and decoding again.
> >
> > This has a massive impact on the out-of-the-box performance of the
> system.
> > Given that, I was wondering whether we should set to default policy to
> "not
> > copying".
> >
> > That is basically the behavior of the batch API, and there has so far
> never
> > been an issue with that (people running into the trap of overwritten
> > mutable elements).
> >
> > What do you think?
> >
> > Stephan
> >
>

Re: Rethink the "always copy" policy for streaming topologies

Posted by Martin Neumann <mn...@sics.se>.
It seems like I'm one of the few people that run into the mutable elements
trap on the Batch API from time to time. At the moment I always clone when
I'm not 100% sure to avoid hunting the bugs later. So far I was happy to
learn that this is not a problem in Streaming, but that's just me.

When working with groupby and partition functions, its easy to forget that
there is one class per operator not per partition. So if you write your
code in the state of mind that each partition is separate object reduce on
operator level becomes really annoying.
Since the mapping between partitions and operators is usually hidden, makes
the debugging harder especially in cases where the test data produces a
single partition per operator and the real deployment does not.

*To summarize:*
I'm not against reusing objects as long as there is something that helps
ease the pitfalls. This could be coding guidelines, debugging tools or best
practices.


On Fri, Oct 2, 2015 at 5:53 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi all!
>
> Now that we are coming to the next release, I wanted to make sure we
> finalize the decision on that point, because it would be nice to not break
> the behavior of system afterwards.
>
> Right now, when tasks are chained together, the system copies the elements
> always between different tasks in the same chain.
>
> I think this policy was established under the assumption that copies do not
> cost anything, given our own test examples, which mainly use immutable
> types like Strings, boxed primitives, ..
>
> In practice, a lot of data types are actually quite expensive to copy.
>
> For example, a rather common data type in the event analysis of web-sources
> is JSON Object.
> Flink treats this as a generic type. Depending on its concrete
> implementation, Kryo may have perform a serialization copy, which means
> encoding into bytes (JSON encoding, charset encoding) and decoding again.
>
> This has a massive impact on the out-of-the-box performance of the system.
> Given that, I was wondering whether we should set to default policy to "not
> copying".
>
> That is basically the behavior of the batch API, and there has so far never
> been an issue with that (people running into the trap of overwritten
> mutable elements).
>
> What do you think?
>
> Stephan
>