You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Geoffry Roberts <ge...@gmail.com> on 2011/05/12 19:44:14 UTC

Number of Reducers Set to One

All,

I am mostly seeking confirmation as to my thinking on this matter.

I have an MR job that I believe will force me into using a single reducer.
The nature of the process is one where calculations performed on a given
record rely on certain accumulated values whose calculation depends on
rolling values from all prior records.  An ultra simple example of this
would be a balance forward situation.  (I'm not doing accounting I'm doing
epidemiology, but the concept is the same.)

Is a single reducer the best way to go in this?

Thanks
-- 
Geoffry Roberts

Re: Number of Reducers Set to One

Posted by Robert Evans <ev...@yahoo-inc.com>.
You could merge the side effect files before running the second map job if you want to, or you could just leave them as separate files and then read each one in the mapper.  If there are lots of files then the namenode may get hit too much, and slow down the entire cluster.  This is mitigated by using the distributed cache to transfer the files, but on a large cluster could still cause some issues, so if there are over 20 files you probably want to concatenate them together before launching the second mapper job.

So for example if you wanted to know how many people are in Las Vegas at any point in time you could do the following. NOTE: this is pseudo code and is not tested in any way.

Mapper1 {
Map: Key<Person person>, Value<Time arrived, Time departed> {
   output.collect(Key<arrived>,Value<person, 1>);
  output.collect(Key<departed>,Value<person, -1>);
}
}

Partitioner1 {
 Partition: Key<Time time>, Value<Person person, Int count>, Int numPartitions {
   Time BeginningTime = readFromConf();
   Time EndingTime = readFromConf();
   long TimePerBucket = (EndingTime - BeginningTime)/numPartitions;
   return time/TimePerBucket; //should probably handle corner cases too
}
}

Reducer1 {
Long seeSoFar = 0;
Time minTime = VERY_BIG_TIME;
Reduce: Key<Time time>, Value<Person person, int count> {
    seenSoFar += count;
    if(minTime > time) {
        minTime = time;
    }
    output.collect(Key: time, Value<person, count>);
}

Cleanup() {
  //a MultipleOutputFormat may be better to use though
  OutputStream file = fs.open("side effect file"+reducer_number);
  file.write(minTime+"\t"+seenSoFar+"\n");
  file.close();
}
}

//Not Shown configuration, or InputFormat to ensure that we do not split the input files.
Mapper2 {
seenSoFar = null;
Map:Key<Time time>, Value<Person person, int count> {
  if(seenSoFar == null) {
    List data<minTime, bigcount> = read in the side effect files.
    seenSoFar = 0;
    foreach<minTime, bigcount> {
        if(minTime < time) {
            seenSoFar += bigcount;
        }
    }
  }
  seenSoFar += count
  output.collect(Key<time>, Value<person, seeSoFar>);
}
}

--Bobby Evans

On 5/12/11 3:11 PM, "Geoffry Roberts" <ge...@gmail.com> wrote:

Bobby,

Thanks for such a thoughtful response.

I have a data set that represents all the people that pass through Las Vegas over a course of time, say five years, which comes to about 175 - 200 million people.   Each record is a person, and it contains fields for where they came from, left to; times of arrival and departure, and infectious state for a given disease, let's say its influenza.

Playing a what if game, I need to compute the probable disease state of each person upon departure.  To do this, one thing -- one on a number of things -- I must do keep a running total of how many people are present in Las Vegas at any moment in time.

I was thinking that running everything through a single reducer would be the best way of accomplishing this.  Your side effect file: Each reducer write out its accumulation to its own file then you merge these together into one big accumulation. Right?

On 12 May 2011 11:51, Robert Evans <ev...@yahoo-inc.com> wrote:
Geoffry,

That really depends on how much data you are processing, and the algorithm you need to use to process the data.  I did something similar a while ago with a medium amount of data and we saw significant speed up by first assigning each record a new key based off of the expected range of keys and just dividing them up equally into N buckets (our data was very evenly distributed and we know the range of the data, but terrasort does something very similar, and has tools to help if your data is not distributed evenly).  Each bucket was then sent to a different reducer, and as we output the now sorted data, we also accumulated some metrics about the data as we saw it go by.  We output these metrics to a side effect file, one per bucket.  Because the number and size of these side effect files was rather small we were then able to pull all of them into all map processes as a cache archive to do a second pass over the data and then calculate the accumulated values for this particular record based off of all buckets before the current one, and all records in this bucket before the current record (had to make sure the map did not split the sorted file that is the output of the previous reduce).  It does not follow a pure map/reduce paradigm, and it took a little bit of math to figure out exactly what values we had to accumulate and convince ourselves that it would produce the same results, but it worked for us.  I am not familiar with what you are trying to compute but it could work for you too.

--Bobby Evans



On 5/12/11 12:44 PM, "Geoffry Roberts" <geoffry.roberts@gmail.com <ht...@gmail.com> > wrote:

All,

I am mostly seeking confirmation as to my thinking on this matter.

I have an MR job that I believe will force me into using a single reducer.  The nature of the process is one where calculations performed on a given record rely on certain accumulated values whose calculation depends on rolling values from all prior records.  An ultra simple example of this would be a balance forward situation.  (I'm not doing accounting I'm doing epidemiology, but the concept is the same.)

Is a single reducer the best way to go in this?

Thanks



Re: Number of Reducers Set to One

Posted by Geoffry Roberts <ge...@gmail.com>.
Bobby,

Thanks for such a thoughtful response.

I have a data set that represents all the people that pass through Las Vegas
over a course of time, say five years, which comes to about 175 - 200
million people.   Each record is a person, and it contains fields for where
they came from, left to; times of arrival and departure, and infectious
state for a given disease, let's say its influenza.

Playing a what if game, I need to compute the probable disease state of each
person upon departure.  To do this, one thing -- one on a number of things
-- I must do keep a running total of how many people are present in Las
Vegas at any moment in time.

I was thinking that running everything through a single reducer would be the
best way of accomplishing this.  Your side effect file: Each reducer write
out its accumulation to its own file then you merge these together into one
big accumulation. Right?

On 12 May 2011 11:51, Robert Evans <ev...@yahoo-inc.com> wrote:

>  Geoffry,
>
> That really depends on how much data you are processing, and the algorithm
> you need to use to process the data.  I did something similar a while ago
> with a medium amount of data and we saw significant speed up by first
> assigning each record a new key based off of the expected range of keys and
> just dividing them up equally into N buckets (our data was very evenly
> distributed and we know the range of the data, but terrasort does something
> very similar, and has tools to help if your data is not distributed evenly).
>  Each bucket was then sent to a different reducer, and as we output the now
> sorted data, we also accumulated some metrics about the data as we saw it go
> by.  We output these metrics to a side effect file, one per bucket.  Because
> the number and size of these side effect files was rather small we were then
> able to pull all of them into all map processes as a cache archive to do a
> second pass over the data and then calculate the accumulated values for this
> particular record based off of all buckets before the current one, and all
> records in this bucket before the current record (had to make sure the map
> did not split the sorted file that is the output of the previous reduce).
>  It does not follow a pure map/reduce paradigm, and it took a little bit of
> math to figure out exactly what values we had to accumulate and convince
> ourselves that it would produce the same results, but it worked for us.  I
> am not familiar with what you are trying to compute but it could work for
> you too.
>
> --Bobby Evans
>
>
>
> On 5/12/11 12:44 PM, "Geoffry Roberts" <ge...@gmail.com> wrote:
>
> All,
>
> I am mostly seeking confirmation as to my thinking on this matter.
>
> I have an MR job that I believe will force me into using a single reducer.
> The nature of the process is one where calculations performed on a given
> record rely on certain accumulated values whose calculation depends on
> rolling values from all prior records.  An ultra simple example of this
> would be a balance forward situation.  (I'm not doing accounting I'm doing
> epidemiology, but the concept is the same.)
>
> Is a single reducer the best way to go in this?
>
> Thanks
>
>


-- 
Geoffry Roberts

Re: Number of Reducers Set to One

Posted by Robert Evans <ev...@yahoo-inc.com>.
Geoffry,

That really depends on how much data you are processing, and the algorithm you need to use to process the data.  I did something similar a while ago with a medium amount of data and we saw significant speed up by first assigning each record a new key based off of the expected range of keys and just dividing them up equally into N buckets (our data was very evenly distributed and we know the range of the data, but terrasort does something very similar, and has tools to help if your data is not distributed evenly).  Each bucket was then sent to a different reducer, and as we output the now sorted data, we also accumulated some metrics about the data as we saw it go by.  We output these metrics to a side effect file, one per bucket.  Because the number and size of these side effect files was rather small we were then able to pull all of them into all map processes as a cache archive to do a second pass over the data and then calculate the accumulated values for this particular record based off of all buckets before the current one, and all records in this bucket before the current record (had to make sure the map did not split the sorted file that is the output of the previous reduce).  It does not follow a pure map/reduce paradigm, and it took a little bit of math to figure out exactly what values we had to accumulate and convince ourselves that it would produce the same results, but it worked for us.  I am not familiar with what you are trying to compute but it could work for you too.

--Bobby Evans


On 5/12/11 12:44 PM, "Geoffry Roberts" <ge...@gmail.com> wrote:

All,

I am mostly seeking confirmation as to my thinking on this matter.

I have an MR job that I believe will force me into using a single reducer.  The nature of the process is one where calculations performed on a given record rely on certain accumulated values whose calculation depends on rolling values from all prior records.  An ultra simple example of this would be a balance forward situation.  (I'm not doing accounting I'm doing epidemiology, but the concept is the same.)

Is a single reducer the best way to go in this?

Thanks