You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Sathish (JIRA)" <ji...@apache.org> on 2013/06/28 19:16:20 UTC

[jira] [Created] (MAPREDUCE-5361) Reducer getting fewer records than expected

Sathish created MAPREDUCE-5361:
----------------------------------

             Summary: Reducer getting fewer records than expected
                 Key: MAPREDUCE-5361
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5361
             Project: Hadoop Map/Reduce
          Issue Type: Bug
            Reporter: Sathish


Hi All,

We have a scenario of generating unique key for every single row in a file. we have a timestamp column but the are multiple rows available for a same timestamp in few scenarios.
We decided unique values to be timestamp appended with their respective count as mentioned in the below program.
Mapper will just emit the timestamp as key and the entire row as its value, and in reducer the key is generated.
Problem is Map outputs about 236 rows, of which only 230 records are fed as an input for reducer which outputs the same 230 records.
public class UniqueKeyGenerator extends Configured implements Tool {

    private static final String SEPERATOR = "\t";
    private static final int TIME_INDEX = 10;
    private static final String COUNT_FORMAT_DIGITS = "%010d";

   public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text row, Context context)
                throws IOException, InterruptedException {
            String input = row.toString();
            String[] vals = input.split(SEPERATOR);
            if (vals != null && vals.length >= TIME_INDEX) {
                context.write(new Text(vals[TIME_INDEX - 1]), row);
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {

        @Override
        protected void reduce(Text eventTimeKey,
                Iterable<Text> timeGroupedRows, Context context)
                throws IOException, InterruptedException {
            int cnt = 1;
            final String eventTime = eventTimeKey.toString();
            for (Text val : timeGroupedRows) {
                final String res = SEPERATOR.concat(getDate(
                        Long.valueOf(eventTime)).concat(
                        String.format(COUNT_FORMAT_DIGITS, cnt)));
                val.append(res.getBytes(), 0, res.length());
                cnt++;
                context.write(NullWritable.get(), val);
            }
        }
    }

    public static String getDate(long time) {
        SimpleDateFormat utcSdf = new SimpleDateFormat("yyyyMMddhhmmss");
        utcSdf.setTimeZone(TimeZone.getTimeZone("America/Los_Angeles"));
        return utcSdf.format(new Date(time));
    }

    public int run(String[] args) throws Exception {
        conf(args);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        conf(args);
    }

    private static void conf(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "uniquekeygen");
        job.setJarByClass(UniqueKeyGenerator.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // job.setNumReduceTasks(400);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

}

It is consistent for higher no of lines and the difference is as huge as 208969 records for an input of 20855982 lines. what might be the reason for reduced inputs to reducer?

Many Thanks,
Sathish.


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira