You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Joe Schmoe <sa...@yahoo.com.INVALID> on 2015/12/03 17:25:10 UTC

Mapreduce reducer not running (or using the identity reducer instead of the one I've specified)

I have set up a Hadoop job like so:public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "Legion");
    job.setJarByClass(Legion.class);

    job.setMapperClass(CallQualityMap.class);
    job.setReducerClass(CallQualityReduce.class);

    // Explicitly configure map and reduce outputs, since they're different classes
    job.setMapOutputKeyClass(CallSampleKey.class);
    job.setMapOutputValueClass(CallSample.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setInputFormatClass(CombineRepublicInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    CombineRepublicInputFormat.setMaxInputSplitSize(job, 128000000);
    CombineRepublicInputFormat.setInputDirRecursive(job, true);
    CombineRepublicInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}This job completes, but something strange happens. I get one output line per input line. Each output line consists of the output from a CallSampleKey.toString() method, then a tab, then something like CallSample@17ab34d.This means that the reduce phase is never running and the CallSampleKey and CallSample are getting passed directly to the TextOutputFormat. But I don't understand why this would be the case. I've very clearly specified job.setReducerClass(CallQualityReduce.class);, so I have no idea why it would skip the reducer!Here's the code for the mapper and reducer:public static class CallQualityMap extends Mapper<NullWritable, RepublicRecord, CallSampleKey, CallSample> {

    private CallSampleKey outKey = new CallSampleKey();
    private CallSample outValue = new CallSample();

    public void setup(Context context) {
        //Configuration config = context.getConfiguration();
    }

    public void map(NullWritable inKey, RepublicRecord inValue, Context context) throws IOException, InterruptedException {
        outKey.setData(inValue.getData("call_id"), inValue.getData("file_name_uuid"));

        outValue.setData(
            Long.valueOf(inValue.getData("timestamp")),
            Float.valueOf(inValue.getData("current_expand_rate")),
            Float.valueOf(inValue.getData("current_buffer_size")),
            inValue.getData("wifi_bssid")
        );

        context.write(outKey, outValue);
    }

}


public static class CallQualityReduce extends Reducer<CallSampleKey, CallSample, NullWritable, Text> {

    public void reduce(CallSampleKey inKey, Iterator<CallSample> inValues, Context context) throws IOException, InterruptedException {
        Call call = new Call(inKey.getId().toString(), inKey.getUuid().toString());

        while (inValues.hasNext()) {
            call.addSample(inValues.next());
        }

        context.write(NullWritable.get(), new Text(call.getStats()));
    }
}Here's controller and syslog output:https://s3.amazonaws.com/rw-hadoop-test/logs/j-39S8C9S340U8I/steps/s-ORYZ0393MEBG/controller.gzhttps://s3.amazonaws.com/rw-hadoop-test/logs/j-39S8C9S340U8I/steps/s-ORYZ0393MEBG/syslog.gz
Any advice on how to proceed here?