You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Manu Zhang <ow...@gmail.com> on 2016/11/01 01:37:24 UTC

Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

I'm a bit confused here because neither of them requires same type of input
and output. Also, the Javadoc of Globally says "It is common for {@code *InputT
== OutputT}, but not required" *If associative and commutative is expected,
why don't they have restrictions like Combine.perKey(SerializableFunction)
?

I understand the motive and requirement behind Combine functions. I'm more
asking about the user interface consistency.
By the way, it's hard to know what Combine.Globally does from the name but
that discussion should be put in another thread.

Thanks for your patience here.

Manu

On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:

> GlobalCombineFn and PerKeyCombineFn still expect an associative and
> commutative function when accumulating.
> GlobalCombineFn is shorthand for assigning everything to a single key,
> doing the combine, and then discarding the key and extracting the single
> output.
> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
> modify the accumulation in anyway.
>
> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
> requirements ?
>
> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>
> For it to be considered a combiner, the function needs to be associative
> and commutative.
>
> The issue is that from an API perspective it would be easy to have a
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
> people in the data processing world expect that this
> parallelization/optimization is performed and thus exposing such a method
> would be dangerous as it would be breaking users expectations so from the
> design perspective it is a hard requirement. If PCollections ever become
> ordered or gain other properties, these requirements may loosen but it
> seems unlikely in the short term.
>
> At this point, I think your looking for a MapElements which you pass in a
> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
> OutputT> should be trivial.
>
>
> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> Thanks for the thorough explanation. I see the benefits for such a
> function.
> My follow-up question is whether this is a hard requirement.
> There are computations that don't satisfy this (I think it's monoid rule)
> but possible and easier to write with
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
> difficult to provide an underlying CombineFn.
>
>
> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> Combine.perKey takes a single SerializableFunction which knows how to
> convert from Iterable<V> to V.
>
> It turns out that many runners implement optimizations which allow them to
> run the combine operation across several machines to parallelize the work
> and potentially reduce the amount of data they store during a GBK.
> To be able to do such an optimization, it requires you to actually have
> three functions:
> InputT -> AccumulatorT : Creates the intermediate representation which
> allows for associative combining
> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
> AccumT -> OutputT: Extracts the output
>
> In the case of Combine.perKey with a SerializableFunction, your providing
> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
> identity functions.
>
> To be able to support a Combine.perKey which can go from Iterable<InputT>
> -> OutputT would require that this occurred within a single machine
> removing the parallelization benefits that runners provide and for almost
> all cases is not a good idea.
>
> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
> > and
> > output to be of the same type while `Combine.PerKey` doesn't have this
> > restriction.
> >
> > Thanks,
> > Manu
> >
>
>
>
>

Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

Posted by Manu Zhang <ow...@gmail.com>.
Thanks guys.  My original confusion comes from that if the API allows me to
have different types of input and output, why not make it easier.  It's
clear now.

Do you think it's better to hide some interfaces we don't expect users to
use ? The Combine API has lured me a lot to do more than it expects ;)



On Tue, Nov 1, 2016 at 12:03 PM Robert Bradshaw <ro...@google.com.invalid>
wrote:

> On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles <kl...@google.com.invalid>
> wrote:
> > Manu, I think your critique about user interface clarity is valid.
> > CombineFn conflates a few operations and is not that clear about what it
> is
> > doing or why. You seem to be concerned about CombineFn versus
> > SerializableFunction constructors for the Combine family of transforms. I
> > thought I'd respond from my own perspective, in case it is helpful. It is
> > mostly the same things that Luke has said. Let's ignore keys. I don't
> think
> > they change things much.
> >
> > As you seem to already understand, a CombineFn is a convenient collapsed
> > representation of three functions:
> >
> >     init : InputT -> AccumT
> >     combiner: (AccumT, AccumT) -> AccumT
> >     extract: AccumT -> OutputT
> >
> > And the real semantics:
> >
> >     MapElements.via(init)
> >     Combine.via(combiner)
> >     MapElements.via(extract)
> >
> > For starters, "associative" is not even a well-typed word to use unless
> > input type and output type are the same. So it is `combiner` that needs
> to
> > be associative and commutative. Sometimes `combiner` also has an identity
> > element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
> > things here (the latter is never meaningfully used). When we say a
> > CombineFn has to be "associative" and "commutative" we just mean that it
> > can be factored into these methods.
> >
> > So the SerializableFunction just needs to be factorable into these
> methods,
> > too, like Luke said. Pragmatically, if we only have a
> > SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way
> to
> > do hierarchical combines (can't feed the output of one layer into the
> next
> > layer), so associativity is irrelevant and it might as well be a
> > MapElements. So it only makes sense to allow
> > SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
> > binary function would make sense for lambdas, etc.
> >
> > Here are some reasons for the particular design of CombineFn that
> actually
> > should be called out:
> >
> >  - It is a major efficiency gain to mutate the accumulator.
> >  - Usually `init` is trivial and best to inline, hence addInput(InputT,
> > AccumT)
>
> I would add that often the map InputT -> AccumT is *non-trivial*, as
> is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both
> for efficiency and code simplicity) for anything beyond trivial
> combiners. FlumeJava, a predecessor to Beam that we took many lessons
> from, had an explicit init rather than addInput and that turned out to
> be a drawback when implementing CombineFns.
>
> >  - With `compact` we allow multiple physical representations of the same
> > semantic accumulator, and a hook to switch between them
> >  - And it is hard to take the user through the journey from the real
> > reasons behind it and the particular Java interface
> >
> > Note also that CombineWithContext allows side inputs, which complicates
> the
> > formalities somewhat but doesn't change the intuition.
> >
> > Kenn
> >
> > On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <ow...@gmail.com>
> wrote:
> >
> >> I'm a bit confused here because neither of them requires same type of
> >> input and output. Also, the Javadoc of Globally says "It is common for
> {@code
> >> *InputT == OutputT}, but not required" *If associative and commutative
> is
> >> expected, why don't they have restrictions like
> >> Combine.perKey(SerializableFunction) ?
> >>
> >> I understand the motive and requirement behind Combine functions. I'm
> more
> >> asking about the user interface consistency.
> >> By the way, it's hard to know what Combine.Globally does from the name
> but
> >> that discussion should be put in another thread.
> >>
> >> Thanks for your patience here.
> >>
> >> Manu
> >>
> >> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> GlobalCombineFn and PerKeyCombineFn still expect an associative and
> >> commutative function when accumulating.
> >> GlobalCombineFn is shorthand for assigning everything to a single key,
> >> doing the combine, and then discarding the key and extracting the single
> >> output.
> >> PerKeyCombineFn is shorthand for doing accumulation where the key
> doesn't
> >> modify the accumulation in anyway.
> >>
> >> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <ow...@gmail.com>
> >> wrote:
> >>
> >> Then what about the other interfaces, like
> Combine.perKey(GlobalCombineFn)
> >> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
> >> requirements ?
> >>
> >> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> For it to be considered a combiner, the function needs to be associative
> >> and commutative.
> >>
> >> The issue is that from an API perspective it would be easy to have a
> >> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But
> many
> >> people in the data processing world expect that this
> >> parallelization/optimization is performed and thus exposing such a
> method
> >> would be dangerous as it would be breaking users expectations so from
> the
> >> design perspective it is a hard requirement. If PCollections ever become
> >> ordered or gain other properties, these requirements may loosen but it
> >> seems unlikely in the short term.
> >>
> >> At this point, I think your looking for a MapElements which you pass in
> a
> >> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
> >> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
> >> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
> >> OutputT> should be trivial.
> >>
> >>
> >> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <ow...@gmail.com>
> >> wrote:
> >>
> >> Thanks for the thorough explanation. I see the benefits for such a
> >> function.
> >> My follow-up question is whether this is a hard requirement.
> >> There are computations that don't satisfy this (I think it's monoid
> rule)
> >> but possible and easier to write with
> >> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's
> not
> >> difficult to provide an underlying CombineFn.
> >>
> >>
> >> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> >> wrote:
> >>
> >> Combine.perKey takes a single SerializableFunction which knows how to
> >> convert from Iterable<V> to V.
> >>
> >> It turns out that many runners implement optimizations which allow them
> to
> >> run the combine operation across several machines to parallelize the
> work
> >> and potentially reduce the amount of data they store during a GBK.
> >> To be able to do such an optimization, it requires you to actually have
> >> three functions:
> >> InputT -> AccumulatorT : Creates the intermediate representation which
> >> allows for associative combining
> >> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
> >> AccumT -> OutputT: Extracts the output
> >>
> >> In the case of Combine.perKey with a SerializableFunction, your
> providing
> >> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are
> the
> >> identity functions.
> >>
> >> To be able to support a Combine.perKey which can go from
> Iterable<InputT>
> >> -> OutputT would require that this occurred within a single machine
> >> removing the parallelization benefits that runners provide and for
> almost
> >> all cases is not a good idea.
> >>
> >> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <ow...@gmail.com>
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I'm wondering why `Combine.perKey(SerializableFunction)` requires
> input
> >> > and
> >> > output to be of the same type while `Combine.PerKey` doesn't have this
> >> > restriction.
> >> >
> >> > Thanks,
> >> > Manu
> >> >
> >>
> >>
> >>
> >>
>

Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

Posted by Manu Zhang <ow...@gmail.com>.
Thanks guys.  My original confusion comes from that if the API allows me to
have different types of input and output, why not make it easier.  It's
clear now.

Do you think it's better to hide some interfaces we don't expect users to
use ? The Combine API has lured me a lot to do more than it expects ;)



On Tue, Nov 1, 2016 at 12:03 PM Robert Bradshaw <ro...@google.com.invalid>
wrote:

> On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles <kl...@google.com.invalid>
> wrote:
> > Manu, I think your critique about user interface clarity is valid.
> > CombineFn conflates a few operations and is not that clear about what it
> is
> > doing or why. You seem to be concerned about CombineFn versus
> > SerializableFunction constructors for the Combine family of transforms. I
> > thought I'd respond from my own perspective, in case it is helpful. It is
> > mostly the same things that Luke has said. Let's ignore keys. I don't
> think
> > they change things much.
> >
> > As you seem to already understand, a CombineFn is a convenient collapsed
> > representation of three functions:
> >
> >     init : InputT -> AccumT
> >     combiner: (AccumT, AccumT) -> AccumT
> >     extract: AccumT -> OutputT
> >
> > And the real semantics:
> >
> >     MapElements.via(init)
> >     Combine.via(combiner)
> >     MapElements.via(extract)
> >
> > For starters, "associative" is not even a well-typed word to use unless
> > input type and output type are the same. So it is `combiner` that needs
> to
> > be associative and commutative. Sometimes `combiner` also has an identity
> > element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
> > things here (the latter is never meaningfully used). When we say a
> > CombineFn has to be "associative" and "commutative" we just mean that it
> > can be factored into these methods.
> >
> > So the SerializableFunction just needs to be factorable into these
> methods,
> > too, like Luke said. Pragmatically, if we only have a
> > SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way
> to
> > do hierarchical combines (can't feed the output of one layer into the
> next
> > layer), so associativity is irrelevant and it might as well be a
> > MapElements. So it only makes sense to allow
> > SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
> > binary function would make sense for lambdas, etc.
> >
> > Here are some reasons for the particular design of CombineFn that
> actually
> > should be called out:
> >
> >  - It is a major efficiency gain to mutate the accumulator.
> >  - Usually `init` is trivial and best to inline, hence addInput(InputT,
> > AccumT)
>
> I would add that often the map InputT -> AccumT is *non-trivial*, as
> is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both
> for efficiency and code simplicity) for anything beyond trivial
> combiners. FlumeJava, a predecessor to Beam that we took many lessons
> from, had an explicit init rather than addInput and that turned out to
> be a drawback when implementing CombineFns.
>
> >  - With `compact` we allow multiple physical representations of the same
> > semantic accumulator, and a hook to switch between them
> >  - And it is hard to take the user through the journey from the real
> > reasons behind it and the particular Java interface
> >
> > Note also that CombineWithContext allows side inputs, which complicates
> the
> > formalities somewhat but doesn't change the intuition.
> >
> > Kenn
> >
> > On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <ow...@gmail.com>
> wrote:
> >
> >> I'm a bit confused here because neither of them requires same type of
> >> input and output. Also, the Javadoc of Globally says "It is common for
> {@code
> >> *InputT == OutputT}, but not required" *If associative and commutative
> is
> >> expected, why don't they have restrictions like
> >> Combine.perKey(SerializableFunction) ?
> >>
> >> I understand the motive and requirement behind Combine functions. I'm
> more
> >> asking about the user interface consistency.
> >> By the way, it's hard to know what Combine.Globally does from the name
> but
> >> that discussion should be put in another thread.
> >>
> >> Thanks for your patience here.
> >>
> >> Manu
> >>
> >> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> GlobalCombineFn and PerKeyCombineFn still expect an associative and
> >> commutative function when accumulating.
> >> GlobalCombineFn is shorthand for assigning everything to a single key,
> >> doing the combine, and then discarding the key and extracting the single
> >> output.
> >> PerKeyCombineFn is shorthand for doing accumulation where the key
> doesn't
> >> modify the accumulation in anyway.
> >>
> >> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <ow...@gmail.com>
> >> wrote:
> >>
> >> Then what about the other interfaces, like
> Combine.perKey(GlobalCombineFn)
> >> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
> >> requirements ?
> >>
> >> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> For it to be considered a combiner, the function needs to be associative
> >> and commutative.
> >>
> >> The issue is that from an API perspective it would be easy to have a
> >> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But
> many
> >> people in the data processing world expect that this
> >> parallelization/optimization is performed and thus exposing such a
> method
> >> would be dangerous as it would be breaking users expectations so from
> the
> >> design perspective it is a hard requirement. If PCollections ever become
> >> ordered or gain other properties, these requirements may loosen but it
> >> seems unlikely in the short term.
> >>
> >> At this point, I think your looking for a MapElements which you pass in
> a
> >> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
> >> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
> >> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
> >> OutputT> should be trivial.
> >>
> >>
> >> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <ow...@gmail.com>
> >> wrote:
> >>
> >> Thanks for the thorough explanation. I see the benefits for such a
> >> function.
> >> My follow-up question is whether this is a hard requirement.
> >> There are computations that don't satisfy this (I think it's monoid
> rule)
> >> but possible and easier to write with
> >> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's
> not
> >> difficult to provide an underlying CombineFn.
> >>
> >>
> >> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> >> wrote:
> >>
> >> Combine.perKey takes a single SerializableFunction which knows how to
> >> convert from Iterable<V> to V.
> >>
> >> It turns out that many runners implement optimizations which allow them
> to
> >> run the combine operation across several machines to parallelize the
> work
> >> and potentially reduce the amount of data they store during a GBK.
> >> To be able to do such an optimization, it requires you to actually have
> >> three functions:
> >> InputT -> AccumulatorT : Creates the intermediate representation which
> >> allows for associative combining
> >> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
> >> AccumT -> OutputT: Extracts the output
> >>
> >> In the case of Combine.perKey with a SerializableFunction, your
> providing
> >> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are
> the
> >> identity functions.
> >>
> >> To be able to support a Combine.perKey which can go from
> Iterable<InputT>
> >> -> OutputT would require that this occurred within a single machine
> >> removing the parallelization benefits that runners provide and for
> almost
> >> all cases is not a good idea.
> >>
> >> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <ow...@gmail.com>
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I'm wondering why `Combine.perKey(SerializableFunction)` requires
> input
> >> > and
> >> > output to be of the same type while `Combine.PerKey` doesn't have this
> >> > restriction.
> >> >
> >> > Thanks,
> >> > Manu
> >> >
> >>
> >>
> >>
> >>
>

Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles <kl...@google.com.invalid> wrote:
> Manu, I think your critique about user interface clarity is valid.
> CombineFn conflates a few operations and is not that clear about what it is
> doing or why. You seem to be concerned about CombineFn versus
> SerializableFunction constructors for the Combine family of transforms. I
> thought I'd respond from my own perspective, in case it is helpful. It is
> mostly the same things that Luke has said. Let's ignore keys. I don't think
> they change things much.
>
> As you seem to already understand, a CombineFn is a convenient collapsed
> representation of three functions:
>
>     init : InputT -> AccumT
>     combiner: (AccumT, AccumT) -> AccumT
>     extract: AccumT -> OutputT
>
> And the real semantics:
>
>     MapElements.via(init)
>     Combine.via(combiner)
>     MapElements.via(extract)
>
> For starters, "associative" is not even a well-typed word to use unless
> input type and output type are the same. So it is `combiner` that needs to
> be associative and commutative. Sometimes `combiner` also has an identity
> element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
> things here (the latter is never meaningfully used). When we say a
> CombineFn has to be "associative" and "commutative" we just mean that it
> can be factored into these methods.
>
> So the SerializableFunction just needs to be factorable into these methods,
> too, like Luke said. Pragmatically, if we only have a
> SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way to
> do hierarchical combines (can't feed the output of one layer into the next
> layer), so associativity is irrelevant and it might as well be a
> MapElements. So it only makes sense to allow
> SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
> binary function would make sense for lambdas, etc.
>
> Here are some reasons for the particular design of CombineFn that actually
> should be called out:
>
>  - It is a major efficiency gain to mutate the accumulator.
>  - Usually `init` is trivial and best to inline, hence addInput(InputT,
> AccumT)

I would add that often the map InputT -> AccumT is *non-trivial*, as
is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both
for efficiency and code simplicity) for anything beyond trivial
combiners. FlumeJava, a predecessor to Beam that we took many lessons
from, had an explicit init rather than addInput and that turned out to
be a drawback when implementing CombineFns.

>  - With `compact` we allow multiple physical representations of the same
> semantic accumulator, and a hook to switch between them
>  - And it is hard to take the user through the journey from the real
> reasons behind it and the particular Java interface
>
> Note also that CombineWithContext allows side inputs, which complicates the
> formalities somewhat but doesn't change the intuition.
>
> Kenn
>
> On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <ow...@gmail.com> wrote:
>
>> I'm a bit confused here because neither of them requires same type of
>> input and output. Also, the Javadoc of Globally says "It is common for {@code
>> *InputT == OutputT}, but not required" *If associative and commutative is
>> expected, why don't they have restrictions like
>> Combine.perKey(SerializableFunction) ?
>>
>> I understand the motive and requirement behind Combine functions. I'm more
>> asking about the user interface consistency.
>> By the way, it's hard to know what Combine.Globally does from the name but
>> that discussion should be put in another thread.
>>
>> Thanks for your patience here.
>>
>> Manu
>>
>> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>> GlobalCombineFn and PerKeyCombineFn still expect an associative and
>> commutative function when accumulating.
>> GlobalCombineFn is shorthand for assigning everything to a single key,
>> doing the combine, and then discarding the key and extracting the single
>> output.
>> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
>> modify the accumulation in anyway.
>>
>> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
>> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
>> requirements ?
>>
>> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>> For it to be considered a combiner, the function needs to be associative
>> and commutative.
>>
>> The issue is that from an API perspective it would be easy to have a
>> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
>> people in the data processing world expect that this
>> parallelization/optimization is performed and thus exposing such a method
>> would be dangerous as it would be breaking users expectations so from the
>> design perspective it is a hard requirement. If PCollections ever become
>> ordered or gain other properties, these requirements may loosen but it
>> seems unlikely in the short term.
>>
>> At this point, I think your looking for a MapElements which you pass in a
>> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
>> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
>> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
>> OutputT> should be trivial.
>>
>>
>> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>> Thanks for the thorough explanation. I see the benefits for such a
>> function.
>> My follow-up question is whether this is a hard requirement.
>> There are computations that don't satisfy this (I think it's monoid rule)
>> but possible and easier to write with
>> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
>> difficult to provide an underlying CombineFn.
>>
>>
>> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> Combine.perKey takes a single SerializableFunction which knows how to
>> convert from Iterable<V> to V.
>>
>> It turns out that many runners implement optimizations which allow them to
>> run the combine operation across several machines to parallelize the work
>> and potentially reduce the amount of data they store during a GBK.
>> To be able to do such an optimization, it requires you to actually have
>> three functions:
>> InputT -> AccumulatorT : Creates the intermediate representation which
>> allows for associative combining
>> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
>> AccumT -> OutputT: Extracts the output
>>
>> In the case of Combine.perKey with a SerializableFunction, your providing
>> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
>> identity functions.
>>
>> To be able to support a Combine.perKey which can go from Iterable<InputT>
>> -> OutputT would require that this occurred within a single machine
>> removing the parallelization benefits that runners provide and for almost
>> all cases is not a good idea.
>>
>> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
>> > and
>> > output to be of the same type while `Combine.PerKey` doesn't have this
>> > restriction.
>> >
>> > Thanks,
>> > Manu
>> >
>>
>>
>>
>>

Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles <kl...@google.com.invalid> wrote:
> Manu, I think your critique about user interface clarity is valid.
> CombineFn conflates a few operations and is not that clear about what it is
> doing or why. You seem to be concerned about CombineFn versus
> SerializableFunction constructors for the Combine family of transforms. I
> thought I'd respond from my own perspective, in case it is helpful. It is
> mostly the same things that Luke has said. Let's ignore keys. I don't think
> they change things much.
>
> As you seem to already understand, a CombineFn is a convenient collapsed
> representation of three functions:
>
>     init : InputT -> AccumT
>     combiner: (AccumT, AccumT) -> AccumT
>     extract: AccumT -> OutputT
>
> And the real semantics:
>
>     MapElements.via(init)
>     Combine.via(combiner)
>     MapElements.via(extract)
>
> For starters, "associative" is not even a well-typed word to use unless
> input type and output type are the same. So it is `combiner` that needs to
> be associative and commutative. Sometimes `combiner` also has an identity
> element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
> things here (the latter is never meaningfully used). When we say a
> CombineFn has to be "associative" and "commutative" we just mean that it
> can be factored into these methods.
>
> So the SerializableFunction just needs to be factorable into these methods,
> too, like Luke said. Pragmatically, if we only have a
> SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way to
> do hierarchical combines (can't feed the output of one layer into the next
> layer), so associativity is irrelevant and it might as well be a
> MapElements. So it only makes sense to allow
> SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
> binary function would make sense for lambdas, etc.
>
> Here are some reasons for the particular design of CombineFn that actually
> should be called out:
>
>  - It is a major efficiency gain to mutate the accumulator.
>  - Usually `init` is trivial and best to inline, hence addInput(InputT,
> AccumT)

I would add that often the map InputT -> AccumT is *non-trivial*, as
is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both
for efficiency and code simplicity) for anything beyond trivial
combiners. FlumeJava, a predecessor to Beam that we took many lessons
from, had an explicit init rather than addInput and that turned out to
be a drawback when implementing CombineFns.

>  - With `compact` we allow multiple physical representations of the same
> semantic accumulator, and a hook to switch between them
>  - And it is hard to take the user through the journey from the real
> reasons behind it and the particular Java interface
>
> Note also that CombineWithContext allows side inputs, which complicates the
> formalities somewhat but doesn't change the intuition.
>
> Kenn
>
> On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <ow...@gmail.com> wrote:
>
>> I'm a bit confused here because neither of them requires same type of
>> input and output. Also, the Javadoc of Globally says "It is common for {@code
>> *InputT == OutputT}, but not required" *If associative and commutative is
>> expected, why don't they have restrictions like
>> Combine.perKey(SerializableFunction) ?
>>
>> I understand the motive and requirement behind Combine functions. I'm more
>> asking about the user interface consistency.
>> By the way, it's hard to know what Combine.Globally does from the name but
>> that discussion should be put in another thread.
>>
>> Thanks for your patience here.
>>
>> Manu
>>
>> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>> GlobalCombineFn and PerKeyCombineFn still expect an associative and
>> commutative function when accumulating.
>> GlobalCombineFn is shorthand for assigning everything to a single key,
>> doing the combine, and then discarding the key and extracting the single
>> output.
>> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
>> modify the accumulation in anyway.
>>
>> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
>> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
>> requirements ?
>>
>> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>> For it to be considered a combiner, the function needs to be associative
>> and commutative.
>>
>> The issue is that from an API perspective it would be easy to have a
>> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
>> people in the data processing world expect that this
>> parallelization/optimization is performed and thus exposing such a method
>> would be dangerous as it would be breaking users expectations so from the
>> design perspective it is a hard requirement. If PCollections ever become
>> ordered or gain other properties, these requirements may loosen but it
>> seems unlikely in the short term.
>>
>> At this point, I think your looking for a MapElements which you pass in a
>> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
>> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
>> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
>> OutputT> should be trivial.
>>
>>
>> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>> Thanks for the thorough explanation. I see the benefits for such a
>> function.
>> My follow-up question is whether this is a hard requirement.
>> There are computations that don't satisfy this (I think it's monoid rule)
>> but possible and easier to write with
>> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
>> difficult to provide an underlying CombineFn.
>>
>>
>> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> Combine.perKey takes a single SerializableFunction which knows how to
>> convert from Iterable<V> to V.
>>
>> It turns out that many runners implement optimizations which allow them to
>> run the combine operation across several machines to parallelize the work
>> and potentially reduce the amount of data they store during a GBK.
>> To be able to do such an optimization, it requires you to actually have
>> three functions:
>> InputT -> AccumulatorT : Creates the intermediate representation which
>> allows for associative combining
>> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
>> AccumT -> OutputT: Extracts the output
>>
>> In the case of Combine.perKey with a SerializableFunction, your providing
>> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
>> identity functions.
>>
>> To be able to support a Combine.perKey which can go from Iterable<InputT>
>> -> OutputT would require that this occurred within a single machine
>> removing the parallelization benefits that runners provide and for almost
>> all cases is not a good idea.
>>
>> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <ow...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
>> > and
>> > output to be of the same type while `Combine.PerKey` doesn't have this
>> > restriction.
>> >
>> > Thanks,
>> > Manu
>> >
>>
>>
>>
>>

Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

Posted by Kenneth Knowles <kl...@google.com>.
Manu, I think your critique about user interface clarity is valid.
CombineFn conflates a few operations and is not that clear about what it is
doing or why. You seem to be concerned about CombineFn versus
SerializableFunction constructors for the Combine family of transforms. I
thought I'd respond from my own perspective, in case it is helpful. It is
mostly the same things that Luke has said. Let's ignore keys. I don't think
they change things much.

As you seem to already understand, a CombineFn is a convenient collapsed
representation of three functions:

    init : InputT -> AccumT
    combiner: (AccumT, AccumT) -> AccumT
    extract: AccumT -> OutputT

And the real semantics:

    MapElements.via(init)
    Combine.via(combiner)
    MapElements.via(extract)

For starters, "associative" is not even a well-typed word to use unless
input type and output type are the same. So it is `combiner` that needs to
be associative and commutative. Sometimes `combiner` also has an identity
element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
things here (the latter is never meaningfully used). When we say a
CombineFn has to be "associative" and "commutative" we just mean that it
can be factored into these methods.

So the SerializableFunction just needs to be factorable into these methods,
too, like Luke said. Pragmatically, if we only have a
SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way to
do hierarchical combines (can't feed the output of one layer into the next
layer), so associativity is irrelevant and it might as well be a
MapElements. So it only makes sense to allow
SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
binary function would make sense for lambdas, etc.

Here are some reasons for the particular design of CombineFn that actually
should be called out:

 - It is a major efficiency gain to mutate the accumulator.
 - Usually `init` is trivial and best to inline, hence addInput(InputT,
AccumT)
 - With `compact` we allow multiple physical representations of the same
semantic accumulator, and a hook to switch between them
 - And it is hard to take the user through the journey from the real
reasons behind it and the particular Java interface

Note also that CombineWithContext allows side inputs, which complicates the
formalities somewhat but doesn't change the intuition.

Kenn

On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <ow...@gmail.com> wrote:

> I'm a bit confused here because neither of them requires same type of
> input and output. Also, the Javadoc of Globally says "It is common for {@code
> *InputT == OutputT}, but not required" *If associative and commutative is
> expected, why don't they have restrictions like
> Combine.perKey(SerializableFunction) ?
>
> I understand the motive and requirement behind Combine functions. I'm more
> asking about the user interface consistency.
> By the way, it's hard to know what Combine.Globally does from the name but
> that discussion should be put in another thread.
>
> Thanks for your patience here.
>
> Manu
>
> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
>
> GlobalCombineFn and PerKeyCombineFn still expect an associative and
> commutative function when accumulating.
> GlobalCombineFn is shorthand for assigning everything to a single key,
> doing the combine, and then discarding the key and extracting the single
> output.
> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
> modify the accumulation in anyway.
>
> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
> requirements ?
>
> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>
> For it to be considered a combiner, the function needs to be associative
> and commutative.
>
> The issue is that from an API perspective it would be easy to have a
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
> people in the data processing world expect that this
> parallelization/optimization is performed and thus exposing such a method
> would be dangerous as it would be breaking users expectations so from the
> design perspective it is a hard requirement. If PCollections ever become
> ordered or gain other properties, these requirements may loosen but it
> seems unlikely in the short term.
>
> At this point, I think your looking for a MapElements which you pass in a
> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
> OutputT> should be trivial.
>
>
> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> Thanks for the thorough explanation. I see the benefits for such a
> function.
> My follow-up question is whether this is a hard requirement.
> There are computations that don't satisfy this (I think it's monoid rule)
> but possible and easier to write with
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
> difficult to provide an underlying CombineFn.
>
>
> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> Combine.perKey takes a single SerializableFunction which knows how to
> convert from Iterable<V> to V.
>
> It turns out that many runners implement optimizations which allow them to
> run the combine operation across several machines to parallelize the work
> and potentially reduce the amount of data they store during a GBK.
> To be able to do such an optimization, it requires you to actually have
> three functions:
> InputT -> AccumulatorT : Creates the intermediate representation which
> allows for associative combining
> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
> AccumT -> OutputT: Extracts the output
>
> In the case of Combine.perKey with a SerializableFunction, your providing
> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
> identity functions.
>
> To be able to support a Combine.perKey which can go from Iterable<InputT>
> -> OutputT would require that this occurred within a single machine
> removing the parallelization benefits that runners provide and for almost
> all cases is not a good idea.
>
> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
> > and
> > output to be of the same type while `Combine.PerKey` doesn't have this
> > restriction.
> >
> > Thanks,
> > Manu
> >
>
>
>
>

Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
Manu, I think your critique about user interface clarity is valid.
CombineFn conflates a few operations and is not that clear about what it is
doing or why. You seem to be concerned about CombineFn versus
SerializableFunction constructors for the Combine family of transforms. I
thought I'd respond from my own perspective, in case it is helpful. It is
mostly the same things that Luke has said. Let's ignore keys. I don't think
they change things much.

As you seem to already understand, a CombineFn is a convenient collapsed
representation of three functions:

    init : InputT -> AccumT
    combiner: (AccumT, AccumT) -> AccumT
    extract: AccumT -> OutputT

And the real semantics:

    MapElements.via(init)
    Combine.via(combiner)
    MapElements.via(extract)

For starters, "associative" is not even a well-typed word to use unless
input type and output type are the same. So it is `combiner` that needs to
be associative and commutative. Sometimes `combiner` also has an identity
element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
things here (the latter is never meaningfully used). When we say a
CombineFn has to be "associative" and "commutative" we just mean that it
can be factored into these methods.

So the SerializableFunction just needs to be factorable into these methods,
too, like Luke said. Pragmatically, if we only have a
SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way to
do hierarchical combines (can't feed the output of one layer into the next
layer), so associativity is irrelevant and it might as well be a
MapElements. So it only makes sense to allow
SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
binary function would make sense for lambdas, etc.

Here are some reasons for the particular design of CombineFn that actually
should be called out:

 - It is a major efficiency gain to mutate the accumulator.
 - Usually `init` is trivial and best to inline, hence addInput(InputT,
AccumT)
 - With `compact` we allow multiple physical representations of the same
semantic accumulator, and a hook to switch between them
 - And it is hard to take the user through the journey from the real
reasons behind it and the particular Java interface

Note also that CombineWithContext allows side inputs, which complicates the
formalities somewhat but doesn't change the intuition.

Kenn

On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <ow...@gmail.com> wrote:

> I'm a bit confused here because neither of them requires same type of
> input and output. Also, the Javadoc of Globally says "It is common for {@code
> *InputT == OutputT}, but not required" *If associative and commutative is
> expected, why don't they have restrictions like
> Combine.perKey(SerializableFunction) ?
>
> I understand the motive and requirement behind Combine functions. I'm more
> asking about the user interface consistency.
> By the way, it's hard to know what Combine.Globally does from the name but
> that discussion should be put in another thread.
>
> Thanks for your patience here.
>
> Manu
>
> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
>
> GlobalCombineFn and PerKeyCombineFn still expect an associative and
> commutative function when accumulating.
> GlobalCombineFn is shorthand for assigning everything to a single key,
> doing the combine, and then discarding the key and extracting the single
> output.
> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
> modify the accumulation in anyway.
>
> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
> requirements ?
>
> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>
> For it to be considered a combiner, the function needs to be associative
> and commutative.
>
> The issue is that from an API perspective it would be easy to have a
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
> people in the data processing world expect that this
> parallelization/optimization is performed and thus exposing such a method
> would be dangerous as it would be breaking users expectations so from the
> design perspective it is a hard requirement. If PCollections ever become
> ordered or gain other properties, these requirements may loosen but it
> seems unlikely in the short term.
>
> At this point, I think your looking for a MapElements which you pass in a
> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
> OutputT> should be trivial.
>
>
> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> Thanks for the thorough explanation. I see the benefits for such a
> function.
> My follow-up question is whether this is a hard requirement.
> There are computations that don't satisfy this (I think it's monoid rule)
> but possible and easier to write with
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
> difficult to provide an underlying CombineFn.
>
>
> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> Combine.perKey takes a single SerializableFunction which knows how to
> convert from Iterable<V> to V.
>
> It turns out that many runners implement optimizations which allow them to
> run the combine operation across several machines to parallelize the work
> and potentially reduce the amount of data they store during a GBK.
> To be able to do such an optimization, it requires you to actually have
> three functions:
> InputT -> AccumulatorT : Creates the intermediate representation which
> allows for associative combining
> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
> AccumT -> OutputT: Extracts the output
>
> In the case of Combine.perKey with a SerializableFunction, your providing
> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
> identity functions.
>
> To be able to support a Combine.perKey which can go from Iterable<InputT>
> -> OutputT would require that this occurred within a single machine
> removing the parallelization benefits that runners provide and for almost
> all cases is not a good idea.
>
> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
> > and
> > output to be of the same type while `Combine.PerKey` doesn't have this
> > restriction.
> >
> > Thanks,
> > Manu
> >
>
>
>
>