You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by "Streckfus, William [USA]" <st...@bah.com> on 2009/04/13 23:53:13 UTC
Grouping Values for Reducer Input
Hi Everyone,
I'm working on a relatively simple MapReduce job with a slight complication
with regards to the ordering of my key/values heading into the reducer. The
output from the mapper might be something like
cat -> doc5, 1
cat -> doc1, 1
cat -> doc5, 3
...
Here, 'cat' is my key and the value is the document ID and the count (my own
WritableComparable.) Originally I was going to create a HashMap in the
reduce method and add an entry for each document ID and sum the counts for
each. I realized the method would be better if the values were in order like
so:
cat -> doc1, 1
cat -> doc5, 1
cat -> doc5, 3
...
Using this style I can continue summing until I reach a new document ID and
just collect the output at this point thus avoiding data structures and
object creation costs. I tried setting
JobConf.setOutputValueGroupingComparator() but this didn't seem to do
anything. In fact, I threw an exception from the Comparator I supplied but
this never showed up when running the job. My map output value consists of a
UTF and a Long so perhaps the Comparator I'm using (identical to
Text.Comparator) is incorrect:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
}
In my final output I'm basically running into the same word -> documentID
being output multiple times. So for the above example I have multiple lines
with cat -> doc5, X.
Reducer method just in case:
public void reduce(Text key, Iterator<TermFrequencyWritable> values,
OutputCollector<Text, TermFrequencyWritable> output, Reporter reporter)
throws IOException {
long sum = 0;
String lastDocID = null;
// Iterate through all values
while(values.hasNext()) {
TermFrequencyWritable value = values.next();
// Encountered new document ID = record and reset
if(!value.getDocumentID().equals(lastDocID)) {
// Ignore first go through
if(sum != 0) {
termFrequency.setDocumentID(lastDocID);
termFrequency.setFrequency(sum);
output.collect(key, termFrequency);
}
sum = 0;
lastDocID = value.getDocumentID();
}
sum += value.getFrequency();
}
// Record last one
termFrequency.setDocumentID(lastDocID);
termFrequency.setFrequency(sum);
output.collect(key, termFrequency);
}
Any ideas (Using Hadoop .19.1)?
Thanks,
- Bill
RE: Grouping Values for Reducer Input
Posted by je...@orange-ftgroup.com.
I'm not familiar with setOutputValueGroupingComparator
what about adding the doc# in the key and have your own
hashing/Partitioner?
so doing something like
cat_doc5-> 1
cat_doc1-> 1
cat_doc5-> 3
the hashing method would take everything before "_" as the hash.
the shuffling would still put the catxxx keys together using your
hashing but sort them like you need.
cat_doc5->1
cat_doc5->3
cat_doc1->1
then the reduce task can count for each doc# in a "cat"
________________________________
From: Streckfus, William [USA] [mailto:streckfus_william@bah.com]
Sent: Monday, April 13, 2009 2:53 PM
To: core-user@hadoop.apache.org
Subject: Grouping Values for Reducer Input
Hi Everyone,
I'm working on a relatively simple MapReduce job with a slight
complication with regards to the ordering of my key/values heading into
the reducer. The output from the mapper might be something like
cat -> doc5, 1
cat -> doc1, 1
cat -> doc5, 3
...
Here, 'cat' is my key and the value is the document ID and the count (my
own WritableComparable.) Originally I was going to create a HashMap in
the reduce method and add an entry for each document ID and sum the
counts for each. I realized the method would be better if the values
were in order like so:
cat -> doc1, 1
cat -> doc5, 1
cat -> doc5, 3
...
Using this style I can continue summing until I reach a new document ID
and just collect the output at this point thus avoiding data structures
and object creation costs. I tried setting
JobConf.setOutputValueGroupingComparator() but this didn't seem to do
anything. In fact, I threw an exception from the Comparator I supplied
but this never showed up when running the job. My map output value
consists of a UTF and a Long so perhaps the Comparator I'm using
(identical to Text.Comparator) is incorrect:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
}
In my final output I'm basically running into the same word ->
documentID being output multiple times. So for the above example I have
multiple lines with cat -> doc5, X.
Reducer method just in case:
public void reduce(Text key, Iterator<TermFrequencyWritable> values,
OutputCollector<Text, TermFrequencyWritable> output, Reporter reporter)
throws IOException {
long sum = 0;
String lastDocID = null;
// Iterate through all values
while(values.hasNext()) {
TermFrequencyWritable value = values.next();
// Encountered new document ID = record and reset
if(!value.getDocumentID().equals(lastDocID)) {
// Ignore first go through
if(sum != 0) {
termFrequency.setDocumentID(lastDocID);
termFrequency.setFrequency(sum);
output.collect(key, termFrequency);
}
sum = 0;
lastDocID = value.getDocumentID();
}
sum += value.getFrequency();
}
// Record last one
termFrequency.setDocumentID(lastDocID);
termFrequency.setFrequency(sum);
output.collect(key, termFrequency);
}
Any ideas (Using Hadoop .19.1)?
Thanks,
- Bill
Re: Grouping Values for Reducer Input
Posted by Jim Twensky <ji...@gmail.com>.
Oh, I forgot to tell that you should change your partitioner to send all the
keys in the form of cat,* to the same reducer but it seems like Jeremy has
been much faster than me :)
-Jim
On Mon, Apr 13, 2009 at 5:24 PM, Jim Twensky <ji...@gmail.com> wrote:
> I'm not sure if this is exactly what you want but, can you emit map records
> as:
>
> cat, doc5 -> 3
> cat, doc1 -> 1
> cat, doc5 -> 1
> and so on..
>
> This way, your reducers will get the intermediate key,value pairs as
>
> cat, doc5 -> 3
> cat, doc5 -> 1
> cat, doc1 -> 1
>
> then you can split the keys (cat, doc*) inside the reducer and perform your
> additions.
>
> -Jim
>
>
> On Mon, Apr 13, 2009 at 4:53 PM, Streckfus, William [USA] <
> streckfus_william@bah.com> wrote:
>
>> Hi Everyone,
>>
>> I'm working on a relatively simple MapReduce job with a slight
>> complication with regards to the ordering of my key/values heading into the
>> reducer. The output from the mapper might be something like
>>
>> cat -> doc5, 1
>> cat -> doc1, 1
>> cat -> doc5, 3
>> ...
>>
>> Here, 'cat' is my key and the value is the document ID and the count (my
>> own WritableComparable.) Originally I was going to create a HashMap in the
>> reduce method and add an entry for each document ID and sum the counts for
>> each. I realized the method would be better if the values were in order like
>> so:
>>
>> cat -> doc1, 1
>> cat -> doc5, 1
>> cat -> doc5, 3
>> ...
>>
>> Using this style I can continue summing until I reach a new document ID
>> and just collect the output at this point thus avoiding data structures and
>> object creation costs. I tried setting
>> JobConf.setOutputValueGroupingComparator() but this didn't seem to do
>> anything. In fact, I threw an exception from the Comparator I supplied but
>> this never showed up when running the job. My map output value consists of a
>> UTF and a Long so perhaps the Comparator I'm using (identical to
>> Text.Comparator) is incorrect:
>>
>> *public* *int* compare(*byte*[] b1, *int* s1, *int* l1, *byte*[] b2, *int
>> * s2, *int* l2) {
>> *int* n1 = WritableUtils.*decodeVIntSize*(b1[s1]);
>> *int* n2 = WritableUtils.*decodeVIntSize*(b2[s2]);
>>
>> *return* *compareBytes*(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
>> }
>> In my final output I'm basically running into the same word -> documentID
>> being output multiple times. So for the above example I have multiple lines
>> with cat -> doc5, X.
>>
>> Reducer method just in case:
>>
>> *public* *void* reduce(Text key, Iterator<TermFrequencyWritable> values,
>> OutputCollector<Text, TermFrequencyWritable> output, Reporter reporter) *
>> throws* IOException {
>> *long* sum = 0;
>> String lastDocID = *null*;
>>
>> // Iterate through all values
>> *while*(values.hasNext()) {
>> TermFrequencyWritable value = values.next();
>>
>> // Encountered new document ID = record and reset
>> *if*(!value.getDocumentID().equals(lastDocID)) {
>> // Ignore first go through
>> *if*(sum != 0) {
>> termFrequency.setDocumentID(lastDocID);
>> termFrequency.setFrequency(sum);
>> output.collect(key, termFrequency);
>> }
>>
>> sum = 0;
>> lastDocID = value.getDocumentID();
>> }
>>
>> sum += value.getFrequency();
>> }
>>
>> // Record last one
>> termFrequency.setDocumentID(lastDocID);
>> termFrequency.setFrequency(sum);
>> output.collect(key, termFrequency);
>> }
>>
>> Any ideas (Using Hadoop .19.1)?
>>
>> Thanks,
>> - Bill
>>
>
>
Re: Grouping Values for Reducer Input
Posted by Jim Twensky <ji...@gmail.com>.
I'm not sure if this is exactly what you want but, can you emit map records
as:
cat, doc5 -> 3
cat, doc1 -> 1
cat, doc5 -> 1
and so on..
This way, your reducers will get the intermediate key,value pairs as
cat, doc5 -> 3
cat, doc5 -> 1
cat, doc1 -> 1
then you can split the keys (cat, doc*) inside the reducer and perform your
additions.
-Jim
On Mon, Apr 13, 2009 at 4:53 PM, Streckfus, William [USA] <
streckfus_william@bah.com> wrote:
> Hi Everyone,
>
> I'm working on a relatively simple MapReduce job with a slight complication
> with regards to the ordering of my key/values heading into the reducer. The
> output from the mapper might be something like
>
> cat -> doc5, 1
> cat -> doc1, 1
> cat -> doc5, 3
> ...
>
> Here, 'cat' is my key and the value is the document ID and the count (my
> own WritableComparable.) Originally I was going to create a HashMap in the
> reduce method and add an entry for each document ID and sum the counts for
> each. I realized the method would be better if the values were in order like
> so:
>
> cat -> doc1, 1
> cat -> doc5, 1
> cat -> doc5, 3
> ...
>
> Using this style I can continue summing until I reach a new document ID and
> just collect the output at this point thus avoiding data structures and
> object creation costs. I tried setting
> JobConf.setOutputValueGroupingComparator() but this didn't seem to do
> anything. In fact, I threw an exception from the Comparator I supplied but
> this never showed up when running the job. My map output value consists of a
> UTF and a Long so perhaps the Comparator I'm using (identical to
> Text.Comparator) is incorrect:
>
> *public* *int* compare(*byte*[] b1, *int* s1, *int* l1, *byte*[] b2, *int*s2,
> *int* l2) {
> *int* n1 = WritableUtils.*decodeVIntSize*(b1[s1]);
> *int* n2 = WritableUtils.*decodeVIntSize*(b2[s2]);
>
> *return* *compareBytes*(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
> }
> In my final output I'm basically running into the same word -> documentID
> being output multiple times. So for the above example I have multiple lines
> with cat -> doc5, X.
>
> Reducer method just in case:
>
> *public* *void* reduce(Text key, Iterator<TermFrequencyWritable> values,
> OutputCollector<Text, TermFrequencyWritable> output, Reporter reporter) *
> throws* IOException {
> *long* sum = 0;
> String lastDocID = *null*;
>
> // Iterate through all values
> *while*(values.hasNext()) {
> TermFrequencyWritable value = values.next();
>
> // Encountered new document ID = record and reset
> *if*(!value.getDocumentID().equals(lastDocID)) {
> // Ignore first go through
> *if*(sum != 0) {
> termFrequency.setDocumentID(lastDocID);
> termFrequency.setFrequency(sum);
> output.collect(key, termFrequency);
> }
>
> sum = 0;
> lastDocID = value.getDocumentID();
> }
>
> sum += value.getFrequency();
> }
>
> // Record last one
> termFrequency.setDocumentID(lastDocID);
> termFrequency.setFrequency(sum);
> output.collect(key, termFrequency);
> }
>
> Any ideas (Using Hadoop .19.1)?
>
> Thanks,
> - Bill
>