You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Chao Shi <st...@live.com> on 2013/09/25 04:59:12 UTC

Ability to specify a combiner (with different signature than reducer)

Hi guys,

I need to have crunch generating a MR pipeline with a combiner and reducer.
My combiner and reducer have different logic. I wonder if this is possible
in crunch.

The problem can be simplified as the following:

Give a series of <string, int> pairs, output the largest K values per key,
and join them to a string. For example, suppose K=2, the output of
<"hello", 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2,
3">, <"world", "3">.

In raw MR, I would like to use a combiner to determine the locally largest
value per key.

class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
  void reduce(Text key, Iterable<IntWritable> values, Context context) {
    go over "values" and keep top K in memory
    emit top K
  }
}

class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
  void reduce(Text key, Iterable<IntWritable> values, Context context) {
    go over "values" and keep top K in memory, assuming saving to "int[]
top";
    context.write(key, join(top, ", "));
  }
}

Could anyone give me a hint on how to do this in crunch? I see
PGroupedTable#combineValues, but I think it requires the reducer and
combiner has the same signature (generic types).

Thanks,
Chao

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Gabriel Reid <ga...@gmail.com>.
On Wed, Sep 25, 2013 at 2:36 PM, Josh Wills <jo...@gmail.com> wrote:

> FWIW, what I usually do in these situations (and they seem to come up a
> lot for machine learning projects) is use a combiner with a post-processing
> reducer that has a different signature. Chao's case is a little different
> because the DoFn needs to know whether it's in the combiner or the reducer
> contexts, but the Crunch framework knows this via the NodeContext, so there
> must be a way to communicate this to the CombineFn. If there isn't, we
> should make a change to expose it.
>

That sounds like it would be pretty handy -- I remember someone else on the
list asking about a similar thing a few months ago as well.


>
> For this example, the output of both my Combiner and my Reducer would be a
> Collection<Integer>, and if I was in the reducer case, I would emit just a
> single Integer to that collection (the max from that combiner), and if I
> was in the reducer context, I would emit the entire Iterable<Integer> as a
> Collection<Integer>. Then I would have a post-processing MapFn that would
> take the values from the Collection<Integer> and join them to a string.
>

I think that's along the same kind of line that I was going with, but if
I'm understanding the issue correctly then there shouldn't even be a need
to know if you're in the reducer or combiner if you're working with
Collection<Integer>. I think that the combiner would be outputting the
top-k entries, and not just the top-1 entry, so both the combiner and the
reducer have the same logic, and can be the same class (although this
necessitates converting the PTable<K, V> to PTable<K, Collection<V>> at the
start).

- Gabriel


>
>
> On Wed, Sep 25, 2013 at 2:58 AM, Chao Shi <st...@live.com> wrote:
>
>> Yes. It was a typo. I mean PTable#combineValues.
>>
>>
>> 2013/9/25 Gabriel Reid <ga...@gmail.com>
>>
>>> Hi Chao,
>>>
>>>
>>>> Your approach is tricky. I agree that this kind of MR logic is pretty
>>>> common. So it would be nice to add such feature to crunch. At the first
>>>> glance, I think the problem in PTable#collectValues is that it return a
>>>> PTable rather than a PGroupedTable (I haven't check the internal logic yet).
>>>>
>>>>
>>> I think that PTable#collectValues is for a different kind of use case --
>>> internally it just does a groupByKey and then puts all the values in a
>>> single collection for each key, so I'm not sure how it would apply here. Or
>>> did you mean the combineValues method?
>>>
>>> - Gabriel
>>>
>>
>>
>

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Josh Wills <jo...@gmail.com>.
Hey Chao/Gabriel,

You two seem to be agreeing, which makes me think I misread Chao's initial
problem specification. :) In any case, it seems like the PTable<K,
Collection<V>> approach will do what you want here, which makes me happy.

J


On Wed, Sep 25, 2013 at 6:32 AM, Chao Shi <st...@live.com> wrote:

> Hi Josh,
>
> I don't quite understand your second paragraph. Did you mean Gabriel's
> approach? As a reducer reads output from a combiner, this requires it must
> read PType<String, Colletcion<Integer>>. In fact, with this approach, I
> don't think the CombineFn needs to tell whether it is run in combiner or
> reducer context: it simply emits top K values. If there no much overhead to
> use the singleton collection, I think this approach would perfectly fit
> crunch's model.
>
>
> 2013/9/25 Josh Wills <jo...@gmail.com>
>
>> FWIW, what I usually do in these situations (and they seem to come up a
>> lot for machine learning projects) is use a combiner with a post-processing
>> reducer that has a different signature. Chao's case is a little different
>> because the DoFn needs to know whether it's in the combiner or the reducer
>> contexts, but the Crunch framework knows this via the NodeContext, so there
>> must be a way to communicate this to the CombineFn. If there isn't, we
>> should make a change to expose it.
>>
>> For this example, the output of both my Combiner and my Reducer would be
>> a Collection<Integer>, and if I was in the reducer case, I would emit just
>> a single Integer to that collection (the max from that combiner), and if I
>> was in the reducer context, I would emit the entire Iterable<Integer> as a
>> Collection<Integer>. Then I would have a post-processing MapFn that would
>> take the values from the Collection<Integer> and join them to a string.
>>
>>
>> On Wed, Sep 25, 2013 at 2:58 AM, Chao Shi <st...@live.com> wrote:
>>
>>> Yes. It was a typo. I mean PTable#combineValues.
>>>
>>>
>>> 2013/9/25 Gabriel Reid <ga...@gmail.com>
>>>
>>>> Hi Chao,
>>>>
>>>>
>>>>> Your approach is tricky. I agree that this kind of MR logic is pretty
>>>>> common. So it would be nice to add such feature to crunch. At the first
>>>>> glance, I think the problem in PTable#collectValues is that it return a
>>>>> PTable rather than a PGroupedTable (I haven't check the internal logic yet).
>>>>>
>>>>>
>>>> I think that PTable#collectValues is for a different kind of use case
>>>> -- internally it just does a groupByKey and then puts all the values in a
>>>> single collection for each key, so I'm not sure how it would apply here. Or
>>>> did you mean the combineValues method?
>>>>
>>>> - Gabriel
>>>>
>>>
>>>
>>
>

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Chao Shi <st...@live.com>.
Hi Josh,

I don't quite understand your second paragraph. Did you mean Gabriel's
approach? As a reducer reads output from a combiner, this requires it must
read PType<String, Colletcion<Integer>>. In fact, with this approach, I
don't think the CombineFn needs to tell whether it is run in combiner or
reducer context: it simply emits top K values. If there no much overhead to
use the singleton collection, I think this approach would perfectly fit
crunch's model.


2013/9/25 Josh Wills <jo...@gmail.com>

> FWIW, what I usually do in these situations (and they seem to come up a
> lot for machine learning projects) is use a combiner with a post-processing
> reducer that has a different signature. Chao's case is a little different
> because the DoFn needs to know whether it's in the combiner or the reducer
> contexts, but the Crunch framework knows this via the NodeContext, so there
> must be a way to communicate this to the CombineFn. If there isn't, we
> should make a change to expose it.
>
> For this example, the output of both my Combiner and my Reducer would be a
> Collection<Integer>, and if I was in the reducer case, I would emit just a
> single Integer to that collection (the max from that combiner), and if I
> was in the reducer context, I would emit the entire Iterable<Integer> as a
> Collection<Integer>. Then I would have a post-processing MapFn that would
> take the values from the Collection<Integer> and join them to a string.
>
>
> On Wed, Sep 25, 2013 at 2:58 AM, Chao Shi <st...@live.com> wrote:
>
>> Yes. It was a typo. I mean PTable#combineValues.
>>
>>
>> 2013/9/25 Gabriel Reid <ga...@gmail.com>
>>
>>> Hi Chao,
>>>
>>>
>>>> Your approach is tricky. I agree that this kind of MR logic is pretty
>>>> common. So it would be nice to add such feature to crunch. At the first
>>>> glance, I think the problem in PTable#collectValues is that it return a
>>>> PTable rather than a PGroupedTable (I haven't check the internal logic yet).
>>>>
>>>>
>>> I think that PTable#collectValues is for a different kind of use case --
>>> internally it just does a groupByKey and then puts all the values in a
>>> single collection for each key, so I'm not sure how it would apply here. Or
>>> did you mean the combineValues method?
>>>
>>> - Gabriel
>>>
>>
>>
>

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Josh Wills <jo...@gmail.com>.
FWIW, what I usually do in these situations (and they seem to come up a lot
for machine learning projects) is use a combiner with a post-processing
reducer that has a different signature. Chao's case is a little different
because the DoFn needs to know whether it's in the combiner or the reducer
contexts, but the Crunch framework knows this via the NodeContext, so there
must be a way to communicate this to the CombineFn. If there isn't, we
should make a change to expose it.

For this example, the output of both my Combiner and my Reducer would be a
Collection<Integer>, and if I was in the reducer case, I would emit just a
single Integer to that collection (the max from that combiner), and if I
was in the reducer context, I would emit the entire Iterable<Integer> as a
Collection<Integer>. Then I would have a post-processing MapFn that would
take the values from the Collection<Integer> and join them to a string.


On Wed, Sep 25, 2013 at 2:58 AM, Chao Shi <st...@live.com> wrote:

> Yes. It was a typo. I mean PTable#combineValues.
>
>
> 2013/9/25 Gabriel Reid <ga...@gmail.com>
>
>> Hi Chao,
>>
>>
>>> Your approach is tricky. I agree that this kind of MR logic is pretty
>>> common. So it would be nice to add such feature to crunch. At the first
>>> glance, I think the problem in PTable#collectValues is that it return a
>>> PTable rather than a PGroupedTable (I haven't check the internal logic yet).
>>>
>>>
>> I think that PTable#collectValues is for a different kind of use case --
>> internally it just does a groupByKey and then puts all the values in a
>> single collection for each key, so I'm not sure how it would apply here. Or
>> did you mean the combineValues method?
>>
>> - Gabriel
>>
>
>

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Chao Shi <st...@live.com>.
Yes. It was a typo. I mean PTable#combineValues.


2013/9/25 Gabriel Reid <ga...@gmail.com>

> Hi Chao,
>
>
>> Your approach is tricky. I agree that this kind of MR logic is pretty
>> common. So it would be nice to add such feature to crunch. At the first
>> glance, I think the problem in PTable#collectValues is that it return a
>> PTable rather than a PGroupedTable (I haven't check the internal logic yet).
>>
>>
> I think that PTable#collectValues is for a different kind of use case --
> internally it just does a groupByKey and then puts all the values in a
> single collection for each key, so I'm not sure how it would apply here. Or
> did you mean the combineValues method?
>
> - Gabriel
>

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Chao,


> Your approach is tricky. I agree that this kind of MR logic is pretty
> common. So it would be nice to add such feature to crunch. At the first
> glance, I think the problem in PTable#collectValues is that it return a
> PTable rather than a PGroupedTable (I haven't check the internal logic yet).
>
>
I think that PTable#collectValues is for a different kind of use case --
internally it just does a groupByKey and then puts all the values in a
single collection for each key, so I'm not sure how it would apply here. Or
did you mean the combineValues method?

- Gabriel

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Chao Shi <st...@live.com>.
Hi Gabriel,

Your approach is tricky. I agree that this kind of MR logic is pretty
common. So it would be nice to add such feature to crunch. At the first
glance, I think the problem in PTable#collectValues is that it return a
PTable rather than a PGroupedTable (I haven't check the internal logic yet).


2013/9/25 Gabriel Reid <ga...@gmail.com>

> Hi Chao,
>
> I don't think it's currently possible to have separate combiner and
> reducer logic.
>
> Actually, looking at what you want to do, it seems that there isn't a
> really simple way to do it in Crunch, which seems surprising because doing
> something with the top-n values per key seems like it would come up pretty
> often.
>
> The best way I can think of accomplishing it would to be to do something
> like this:
>
>         // Convert the original PTable into a table of <K, Collection<V>>
> with single values in the collection
>         PTable<K,V> input = …;
>         PTable<K, Collection<V>> tableOfCollections = input.parallelDo(new
> ValueToSingleElementCollectionFn());
>
>         // Use a custom CombineFn that collects the top values per key,
> does a nested loop over the
>         // incoming iterable of collections
>         PTable<K, Collection<V>> topValuesPerKey =
>                 tableOfCollections.groupByKey().combineValues(new
> NestedLoopTopKCombineFn());
>
>         PTable<K, String> withJoinedValues = topValuesPerKey.paralleDo(new
> JoinValuesAsStringFn());
>
>
> This feels pretty hacky, but as far as I can see it's the easiest way to
> use a Combiner as part of the
> top-k selection. I'm a bit worried about the impact that the use of the
> single-element collections would
> introduce as well, and have a nagging feeling that there must be a better
> way, but I don't see it at the moment.
>
> - Gabriel
>
> On 25 Sep 2013, at 04:59, Chao Shi <st...@live.com> wrote:
>
> > Hi guys,
> >
> > I need to have crunch generating a MR pipeline with a combiner and
> reducer. My combiner and reducer have different logic. I wonder if this is
> possible in crunch.
> >
> > The problem can be simplified as the following:
> >
> > Give a series of <string, int> pairs, output the largest K values per
> key, and join them to a string. For example, suppose K=2, the output of
> <"hello", 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2,
> 3">, <"world", "3">.
> >
> > In raw MR, I would like to use a combiner to determine the locally
> largest value per key.
> >
> > class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
> >   void reduce(Text key, Iterable<IntWritable> values, Context context) {
> >     go over "values" and keep top K in memory
> >     emit top K
> >   }
> > }
> >
> > class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
> >   void reduce(Text key, Iterable<IntWritable> values, Context context) {
> >     go over "values" and keep top K in memory, assuming saving to "int[]
> top";
> >     context.write(key, join(top, ", "));
> >   }
> > }
> >
> > Could anyone give me a hint on how to do this in crunch? I see
> PGroupedTable#combineValues, but I think it requires the reducer and
> combiner has the same signature (generic types).
> >
> > Thanks,
> > Chao
>
>
>

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Chao,

I don't think it's currently possible to have separate combiner and reducer logic.

Actually, looking at what you want to do, it seems that there isn't a really simple way to do it in Crunch, which seems surprising because doing something with the top-n values per key seems like it would come up pretty often.

The best way I can think of accomplishing it would to be to do something like this:

	// Convert the original PTable into a table of <K, Collection<V>> with single values in the collection
	PTable<K,V> input = …;
	PTable<K, Collection<V>> tableOfCollections = input.parallelDo(new ValueToSingleElementCollectionFn());

	// Use a custom CombineFn that collects the top values per key, does a nested loop over the 
	// incoming iterable of collections
	PTable<K, Collection<V>> topValuesPerKey = 
		tableOfCollections.groupByKey().combineValues(new NestedLoopTopKCombineFn());

	PTable<K, String> withJoinedValues = topValuesPerKey.paralleDo(new JoinValuesAsStringFn());


This feels pretty hacky, but as far as I can see it's the easiest way to use a Combiner as part of the
top-k selection. I'm a bit worried about the impact that the use of the single-element collections would
introduce as well, and have a nagging feeling that there must be a better way, but I don't see it at the moment.

- Gabriel

On 25 Sep 2013, at 04:59, Chao Shi <st...@live.com> wrote:

> Hi guys,
> 
> I need to have crunch generating a MR pipeline with a combiner and reducer. My combiner and reducer have different logic. I wonder if this is possible in crunch.
> 
> The problem can be simplified as the following:
> 
> Give a series of <string, int> pairs, output the largest K values per key, and join them to a string. For example, suppose K=2, the output of <"hello", 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2, 3">, <"world", "3">.
> 
> In raw MR, I would like to use a combiner to determine the locally largest value per key. 
> 
> class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
>   void reduce(Text key, Iterable<IntWritable> values, Context context) {
>     go over "values" and keep top K in memory
>     emit top K
>   }
> }
> 
> class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
>   void reduce(Text key, Iterable<IntWritable> values, Context context) {
>     go over "values" and keep top K in memory, assuming saving to "int[] top";
>     context.write(key, join(top, ", "));
>   }
> }
> 
> Could anyone give me a hint on how to do this in crunch? I see PGroupedTable#combineValues, but I think it requires the reducer and combiner has the same signature (generic types).
> 
> Thanks,
> Chao


Re: Ability to specify a combiner (with different signature than reducer)

Posted by Chao Shi <st...@live.com>.
Hi Som,

This approach does not make use of combiners. Suppose K is small, using
combiners may greatly reduce the shuffle traffic. (Correct me if I'm wrong.)


2013/9/25 Som Satpathy <so...@gmail.com>

> Hi Chao,
>
> You could do a groupBy and then do a parallelDo to iterate over the key
> values to emit the top K values per key via Pair<K,V>.
>
> Som
>
>
> On Tue, Sep 24, 2013 at 7:59 PM, Chao Shi <st...@live.com> wrote:
>
>> Hi guys,
>>
>> I need to have crunch generating a MR pipeline with a combiner and
>> reducer. My combiner and reducer have different logic. I wonder if this is
>> possible in crunch.
>>
>> The problem can be simplified as the following:
>>
>> Give a series of <string, int> pairs, output the largest K values per
>> key, and join them to a string. For example, suppose K=2, the output of
>> <"hello", 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2,
>> 3">, <"world", "3">.
>>
>> In raw MR, I would like to use a combiner to determine the locally
>> largest value per key.
>>
>> class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
>>    void reduce(Text key, Iterable<IntWritable> values, Context context) {
>>     go over "values" and keep top K in memory
>>     emit top K
>>   }
>> }
>>
>> class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
>>   void reduce(Text key, Iterable<IntWritable> values, Context context) {
>>     go over "values" and keep top K in memory, assuming saving to "int[]
>> top";
>>     context.write(key, join(top, ", "));
>>   }
>> }
>>
>> Could anyone give me a hint on how to do this in crunch? I see
>> PGroupedTable#combineValues, but I think it requires the reducer and
>> combiner has the same signature (generic types).
>>
>> Thanks,
>> Chao
>>
>
>

Re: Ability to specify a combiner (with different signature than reducer)

Posted by Som Satpathy <so...@gmail.com>.
Hi Chao,

You could do a groupBy and then do a parallelDo to iterate over the key
values to emit the top K values per key via Pair<K,V>.

Som


On Tue, Sep 24, 2013 at 7:59 PM, Chao Shi <st...@live.com> wrote:

> Hi guys,
>
> I need to have crunch generating a MR pipeline with a combiner and
> reducer. My combiner and reducer have different logic. I wonder if this is
> possible in crunch.
>
> The problem can be simplified as the following:
>
> Give a series of <string, int> pairs, output the largest K values per key,
> and join them to a string. For example, suppose K=2, the output of
> <"hello", 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2,
> 3">, <"world", "3">.
>
> In raw MR, I would like to use a combiner to determine the locally largest
> value per key.
>
> class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
>    void reduce(Text key, Iterable<IntWritable> values, Context context) {
>     go over "values" and keep top K in memory
>     emit top K
>   }
> }
>
> class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
>   void reduce(Text key, Iterable<IntWritable> values, Context context) {
>     go over "values" and keep top K in memory, assuming saving to "int[]
> top";
>     context.write(key, join(top, ", "));
>   }
> }
>
> Could anyone give me a hint on how to do this in crunch? I see
> PGroupedTable#combineValues, but I think it requires the reducer and
> combiner has the same signature (generic types).
>
> Thanks,
> Chao
>