You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Henry Helgen <hh...@gmail.com> on 2012/03/09 00:02:20 UTC

reduce stop after n records

I am using hadoop 0.20.2 mapreduce API. The program is running fine, just
slower than it could.

I sum values and then use
job.setSortComparatorClass(LongWritable.DecreasingComparator.class) to sort
descending by sum. I need to stop the reducer after outputting the first N
records. This would save the reducer from running over thousands of records
when it only needs the first few records. Is there a solution with the new
mapreduce 0.20.2 API?

-------------------------------------------------------------------
I notice messages from 2008 about this topic:
http://grokbase.com/t/hadoop/common-user/089420wvkx/stop-mr-jobs-after-n-records-have-been-produced

https://issues.apache.org/jira/browse/HADOOP-3973

The last statement follows,  but the link is broken.
"You could do this pretty easily by implementing a custom MapRunnable.
There is no equivalent for reduces. The interface proposed in
HADOOP-1230 would support that kind of application. See:
http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/
hadoop/mapreduce/
Look at the new Mapper and Reducer interfaces."

Re: reduce stop after n records

Posted by Harsh J <ha...@cloudera.com>.
Henry,

Something like:

  @Override
  public void run(Context context) throws IOException, InterruptedException {
    // Do stuff here you'd do in setup(…) otherwise.
    // Now begin iterating.
    while (context.nextKey()) {
      // Run your reducing function here. Like the following maybe.
      reduce(context.getCurrentKey(), context.getValues(), context);
      // Since you are now in a regular loop. Break as necessary
whenever you want. Your logic.
    }
    // Do stuff here you'd otherwise do in cleanup(context);
  }

On Fri, Mar 9, 2012 at 11:32 PM, Henry Helgen <hh...@gmail.com> wrote:
> Thanks, the presentation is helpful. I am using the new API's context
> objects, but am not familiar with how to use the run method to accomplish
> this. Do you have some code ideas or examples?
>
> job code follows. Is this right?
> -------------------------------------------------------------------------------------------
>   public static void main(String[] args) throws Exception {
>     /**
>      * The main accepts the input and output directories as command line
>          * parameters. Then it defines the job classes. The main submits two
> jobs
>          * The first job filters out languages and sums the pagecounts. The
>      * second job sorts by pagecount descending then selects the top 50.
>      * @param key, value, context provided by Job.setReducerClass()
>      * @return none
>      * @throws IOException, InterruptedException
>      */
>
>     //configure the first job
>     Configuration conf1 = new Configuration();
>     String[] otherArgs1 = new GenericOptionsParser(conf1,
> args).getRemainingArgs();
>     if (otherArgs1.length != 2) {
>       System.err.println("Usage: programname <in> <out>");
>       System.exit(2);
>     }//end if args
>     Job job1 = new Job(conf1, "Wiki Counts part 1");
>     job1.setJarByClass(WikiCounts.class);
>     job1.setMapperClass(LineTokenMapper.class); //custom mapper job 1
>     job1.setCombinerClass(LongSumReducer.class);
>     job1.setReducerClass(LongSumReducer.class);
>
>     //Set reducers White p193
>     job1.setNumReduceTasks(32);
>     job1.setOutputKeyClass(Text.class);
>     job1.setOutputValueClass(LongWritable.class);
>
>     //Sequence File and Compression White p233.
>     job1.setOutputFormatClass(SequenceFileOutputFormat.class);
>     SequenceFileOutputFormat.setCompressOutput(job1, true);
>     SequenceFileOutputFormat.setOutputCompressorClass(job1,
> GzipCodec.class);
>     SequenceFileOutputFormat.setOutputCompressionType(job1,
> CompressionType.BLOCK);
>
>     FileInputFormat.addInputPath(job1, new Path(otherArgs1[0]));
>     FileOutputFormat.setOutputPath(job1, new Path("out1in2batch"));
>
>
>     //configure the second job
>     Configuration conf2 = new Configuration();
>     String[] otherArgs2 = new GenericOptionsParser(conf2,
> args).getRemainingArgs();
>     if (otherArgs2.length != 2) {
>       System.err.println("Usage: programname <in> <out>");
>       System.exit(2);
>     }//end if args
>     Job job2 = new Job(conf2, "Wiki Counts part 2");
>     job2.setJarByClass(WikiCounts.class);
>     job2.setInputFormatClass(SequenceFileInputFormat.class);
>     job2.setMapperClass(InverseMapper.class);
>     job2.setReducerClass(InverseTopReducer.class); //custom reducer job 2
>     job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
>     FileInputFormat.addInputPath(job2, new Path("out1in2batch"));
>     FileOutputFormat.setOutputPath(job2, new Path(otherArgs2[1]));
>
>     //run the jobs in order
>     job1.waitForCompletion(true);
>     job2.waitForCompletion(true);
>     System.exit(job2.waitForCompletion(true) ? 0 : 1);
>   }//end main
>
> }//end class
> ********************************************************************
>
> ----------------------------------------------------------------------
>
> Hello Henry,
>
> Per the older conversation, what Owen was pointing to were the new API
> Mapper/Reducer classes, and its run(�) method override specifically:
> http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/mapreduce/Reducer.html#run(org.apache.hadoop.mapreduce.Reducer.Context)
>
> You'll need to port your job to the new (still a bit unstable) API to
> leverage this. Here are some slides to aid you in that task:
> http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api (The
> first part, from Owen).
>
>
>
> On Thu, Mar 8, 2012 at 5:02 PM, Henry Helgen <hh...@gmail.com> wrote:
>>
>> I am using hadoop 0.20.2 mapreduce API. The program is running fine, just
>> slower than it could.
>>
>> I sum values and then use
>> job.setSortComparatorClass(LongWritable.DecreasingComparator.class) to sort
>> descending by sum. I need to stop the reducer after outputting the first N
>> records. This would save the reducer from running over thousands of records
>> when it only needs the first few records. Is there a solution with the new
>> mapreduce 0.20.2 API?
>>
>> -------------------------------------------------------------------
>> I notice messages from 2008 about this topic:
>>
>> http://grokbase.com/t/hadoop/common-user/089420wvkx/stop-mr-jobs-after-n-records-have-been-produced
>>
>> https://issues.apache.org/jira/browse/HADOOP-3973
>>
>> The last statement follows,  but the link is broken.
>> "You could do this pretty easily by implementing a custom MapRunnable.
>> There is no equivalent for reduces. The interface proposed in
>> HADOOP-1230 would support that kind of application. See:
>>
>> http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/
>> Look at the new Mapper and Reducer interfaces."
>>
>



-- 
Harsh J

Re: reduce stop after n records

Posted by Henry Helgen <hh...@gmail.com>.
Thanks, the presentation is helpful. I am using the new API's context
objects, but am not familiar with how to use the run method to accomplish
this. Do you have some code ideas or examples?

job code follows. Is this right?
-------------------------------------------------------------------------------------------
  public static void main(String[] args) throws Exception {
    /**
     * The main accepts the input and output directories as command line
         * parameters. Then it defines the job classes. The main submits
two jobs
         * The first job filters out languages and sums the pagecounts. The
     * second job sorts by pagecount descending then selects the top 50.
     * @param key, value, context provided by Job.setReducerClass()
     * @return none
     * @throws IOException, InterruptedException
     */

    //configure the first job
    Configuration conf1 = new Configuration();
    String[] otherArgs1 = new GenericOptionsParser(conf1,
args).getRemainingArgs();
    if (otherArgs1.length != 2) {
      System.err.println("Usage: programname <in> <out>");
      System.exit(2);
    }//end if args
    Job job1 = new Job(conf1, "Wiki Counts part 1");
    job1.setJarByClass(WikiCounts.class);
    job1.setMapperClass(LineTokenMapper.class); //custom mapper job 1
    job1.setCombinerClass(LongSumReducer.class);
    job1.setReducerClass(LongSumReducer.class);

    //Set reducers White p193
    job1.setNumReduceTasks(32);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(LongWritable.class);

    //Sequence File and Compression White p233.
    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
    SequenceFileOutputFormat.setCompressOutput(job1, true);
    SequenceFileOutputFormat.setOutputCompressorClass(job1,
GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job1,
CompressionType.BLOCK);

    FileInputFormat.addInputPath(job1, new Path(otherArgs1[0]));
    FileOutputFormat.setOutputPath(job1, new Path("out1in2batch"));


    //configure the second job
    Configuration conf2 = new Configuration();
    String[] otherArgs2 = new GenericOptionsParser(conf2,
args).getRemainingArgs();
    if (otherArgs2.length != 2) {
      System.err.println("Usage: programname <in> <out>");
      System.exit(2);
    }//end if args
    Job job2 = new Job(conf2, "Wiki Counts part 2");
    job2.setJarByClass(WikiCounts.class);
    job2.setInputFormatClass(SequenceFileInputFormat.class);
    job2.setMapperClass(InverseMapper.class);
    job2.setReducerClass(InverseTopReducer.class); //custom reducer job 2
    job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
    FileInputFormat.addInputPath(job2, new Path("out1in2batch"));
    FileOutputFormat.setOutputPath(job2, new Path(otherArgs2[1]));

    //run the jobs in order
    job1.waitForCompletion(true);
    job2.waitForCompletion(true);
    System.exit(job2.waitForCompletion(true) ? 0 : 1);
  }//end main

}//end class
********************************************************************

----------------------------------------------------------------------

Hello Henry,

Per the older conversation, what Owen was pointing to were the new API
Mapper/Reducer classes, and its run(�) method override
specifically:http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/mapreduce/Reducer.html#run(org.apache.hadoop.mapreduce.Reducer.Context)

You'll need to port your job to the new (still a bit unstable) API to
leverage this. Here are some slides to aid you in that
task:http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api
(The
first part, from Owen).



On Thu, Mar 8, 2012 at 5:02 PM, Henry Helgen <hh...@gmail.com> wrote:

> I am using hadoop 0.20.2 mapreduce API. The program is running fine, just
> slower than it could.
>
> I sum values and then use
> job.setSortComparatorClass(LongWritable.DecreasingComparator.class) to sort
> descending by sum. I need to stop the reducer after outputting the first N
> records. This would save the reducer from running over thousands of records
> when it only needs the first few records. Is there a solution with the new
> mapreduce 0.20.2 API?
>
> -------------------------------------------------------------------
> I notice messages from 2008 about this topic:
>
> http://grokbase.com/t/hadoop/common-user/089420wvkx/stop-mr-jobs-after-n-records-have-been-produced
>
> https://issues.apache.org/jira/browse/HADOOP-3973
>
> The last statement follows,  but the link is broken.
> "You could do this pretty easily by implementing a custom MapRunnable.
> There is no equivalent for reduces. The interface proposed in
> HADOOP-1230 would support that kind of application. See:
> http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/
> hadoop/mapreduce/
> Look at the new Mapper and Reducer interfaces."
>
>

Re: reduce stop after n records

Posted by Harsh J <ha...@cloudera.com>.
Hello Henry,

Per the older conversation, what Owen was pointing to were the new API
Mapper/Reducer classes, and its run(…) method override specifically:
http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/mapreduce/Reducer.html#run(org.apache.hadoop.mapreduce.Reducer.Context)

You'll need to port your job to the new (still a bit unstable) API to
leverage this. Here are some slides to aid you in that task:
http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api (The
first part, from Owen).

On Fri, Mar 9, 2012 at 4:32 AM, Henry Helgen <hh...@gmail.com> wrote:

> I am using hadoop 0.20.2 mapreduce API. The program is running fine, just
> slower than it could.
>
> I sum values and then use
> job.setSortComparatorClass(LongWritable.DecreasingComparator.class) to sort
> descending by sum. I need to stop the reducer after outputting the first N
> records. This would save the reducer from running over thousands of records
> when it only needs the first few records. Is there a solution with the new
> mapreduce 0.20.2 API?
>
> -------------------------------------------------------------------
> I notice messages from 2008 about this topic:
>
> http://grokbase.com/t/hadoop/common-user/089420wvkx/stop-mr-jobs-after-n-records-have-been-produced
>
> https://issues.apache.org/jira/browse/HADOOP-3973
>
> The last statement follows,  but the link is broken.
> "You could do this pretty easily by implementing a custom MapRunnable.
> There is no equivalent for reduces. The interface proposed in
> HADOOP-1230 would support that kind of application. See:
> http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/
> hadoop/mapreduce/
> Look at the new Mapper and Reducer interfaces."
>
>


-- 
Harsh J

Re: reduce stop after n records

Posted by Henry Helgen <hh...@gmail.com>.
Harsh,

Great. Thanks, I appreciate it. I'll try that.

Henry


Henry,

Something like:

  @Override
  public void run(Context context) throws IOException, InterruptedException {
    // Do stuff here you'd do in setup(…) otherwise.
    // Now begin iterating.
    while (context.nextKey()) {
      // Run your reducing function here. Like the following maybe.
      reduce(context.getCurrentKey(), context.getValues(), context);
      // Since you are now in a regular loop. Break as necessary
whenever you want. Your logic.
    }
    // Do stuff here you'd otherwise do in cleanup(context);
  }