You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Bejoy KS <be...@gmail.com> on 2011/09/06 14:16:51 UTC

Hadoop Mapreduce 0.20 - reduce job not executing as desired

Hi Experts
          I was working on  Hadoop mapreduce  0.18 API for some time. Now I
just tried to migrate some existing application to hadoop mapreduce 0.20
API. But after the migration, It seems like the reduce logic is not working.
Map output records and reduce output records show the same number on console
even though i have an aggregation operation on reducer. I tried to migrate
the simple word count example to 0.20 API to debug, but still the same
issue. I feel I'm missing something but still not able to locate what it is.
Please share your thoughts, given below are the sample codes.

*Mapper*

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text,
IntWritable>
{
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

           public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
           {
               String line = value.toString();
               StringTokenizer tokenizer = new StringTokenizer(line);

             //iterating through all the words available in that line and
forming the key value pair
               while (tokenizer.hasMoreTokens())
               {
                  word.set(tokenizer.nextToken());
                  //sending to output collector which inturn passes the same
to reducer
                  context.write(word, one);
               }
           }
 }

*Reducer*

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text,
IntWritable>
{
      //Reduce method for just outputting the key from mapper as the value
from mapper is just an empty string
      public void reduce(Text key, Iterator<Text> values, Context context)
throws IOException, InterruptedException
      {
            int sum = 0;
            /*iterates through all the values available with a key and add
them together and give the
            final result as the key and sum of its values*/
            while (values.hasNext())
            {
                  sum += 1;
                  values.next();
            }
            context.write(key, new IntWritable(sum));
       }
}

*Driver Class*

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



public class WordCount extends Configured implements Tool
{
      public int run(String[] args) throws Exception
      {
            //getting configuration object and setting job name
            Configuration conf = getConf();
            Job job = new Job(conf, "Word Count hadoop-0.20");

            //setting the class names
            job.setJarByClass(WordCount.class);
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);

            //setting the output data type classes
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            //to accept the hdfs input and outpur dir at run time
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordCount(),
args);
        System.exit(res);
    }
}

Re: Hadoop Mapreduce 0.20 - reduce job not executing as desired

Posted by Harsh J <ha...@cloudera.com>.
This has come up before. Its a code error of using Iterator instead of
Iterable. Please see
http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-user/201107.mbox/%3CCAOcnVr3tdEt9_+PxqcJXUPEygTe7R7-4kV34tA5MBhDqyOX1+A@mail.gmail.com%3E

P.s. Where did you pick up the new reduce(…) signature in docs? Maybe
everyone's reading a faulty docs somewhere and that's why there's so
many reports of this user-error.

On Tue, Sep 6, 2011 at 5:46 PM, Bejoy KS <be...@gmail.com> wrote:
> Hi Experts
>           I was working on  Hadoop mapreduce  0.18 API for some time. Now I
> just tried to migrate some existing application to hadoop mapreduce 0.20
> API. But after the migration, It seems like the reduce logic is not working.
> Map output records and reduce output records show the same number on console
> even though i have an aggregation operation on reducer. I tried to migrate
> the simple word count example to 0.20 API to debug, but still the same
> issue. I feel I'm missing something but still not able to locate what it is.
> Please share your thoughts, given below are the sample codes.
>
> Mapper
>
> import java.io.IOException;
> import java.util.StringTokenizer;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapreduce.Mapper;
>
> public class WordCountMapper extends Mapper<LongWritable, Text, Text,
> IntWritable>
> {
>       private final static IntWritable one = new IntWritable(1);
>       private Text word = new Text();
>
>            public void map(LongWritable key, Text value, Context context)
> throws IOException, InterruptedException
>            {
>                String line = value.toString();
>                StringTokenizer tokenizer = new StringTokenizer(line);
>
>              //iterating through all the words available in that line and
> forming the key value pair
>                while (tokenizer.hasMoreTokens())
>                {
>                   word.set(tokenizer.nextToken());
>                   //sending to output collector which inturn passes the same
> to reducer
>                   context.write(word, one);
>                }
>            }
>  }
>
> Reducer
>
> import java.io.IOException;
> import java.util.Iterator;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapreduce.Reducer;
>
> public class WordCountReducer extends Reducer<Text, IntWritable, Text,
> IntWritable>
> {
>       //Reduce method for just outputting the key from mapper as the value
> from mapper is just an empty string
>       public void reduce(Text key, Iterator<Text> values, Context context)
> throws IOException, InterruptedException
>       {
>             int sum = 0;
>             /*iterates through all the values available with a key and add
> them together and give the
>             final result as the key and sum of its values*/
>             while (values.hasNext())
>             {
>                   sum += 1;
>                   values.next();
>             }
>             context.write(key, new IntWritable(sum));
>        }
> }
>
> Driver Class
>
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.IntWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
>
>
> public class WordCount extends Configured implements Tool
> {
>       public int run(String[] args) throws Exception
>       {
>             //getting configuration object and setting job name
>             Configuration conf = getConf();
>             Job job = new Job(conf, "Word Count hadoop-0.20");
>
>             //setting the class names
>             job.setJarByClass(WordCount.class);
>             job.setMapperClass(WordCountMapper.class);
>             job.setReducerClass(WordCountReducer.class);
>
>             //setting the output data type classes
>             job.setOutputKeyClass(Text.class);
>             job.setOutputValueClass(IntWritable.class);
>
>             //to accept the hdfs input and outpur dir at run time
>             FileInputFormat.addInputPath(job, new Path(args[0]));
>             FileOutputFormat.setOutputPath(job, new Path(args[1]));
>
>             return job.waitForCompletion(true) ? 0 : 1;
>     }
>
>     public static void main(String[] args) throws Exception {
>         int res = ToolRunner.run(new Configuration(), new WordCount(),
> args);
>         System.exit(res);
>     }
> }
>
>



-- 
Harsh J