You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Joe Schmoe <sa...@yahoo.com.INVALID> on 2015/12/03 17:25:10 UTC
Mapreduce reducer not running (or using the identity reducer
instead of the one I've specified)
I have set up a Hadoop job like so:public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Legion");
job.setJarByClass(Legion.class);
job.setMapperClass(CallQualityMap.class);
job.setReducerClass(CallQualityReduce.class);
// Explicitly configure map and reduce outputs, since they're different classes
job.setMapOutputKeyClass(CallSampleKey.class);
job.setMapOutputValueClass(CallSample.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(CombineRepublicInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
CombineRepublicInputFormat.setMaxInputSplitSize(job, 128000000);
CombineRepublicInputFormat.setInputDirRecursive(job, true);
CombineRepublicInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}This job completes, but something strange happens. I get one output line per input line. Each output line consists of the output from a CallSampleKey.toString() method, then a tab, then something like CallSample@17ab34d.This means that the reduce phase is never running and the CallSampleKey and CallSample are getting passed directly to the TextOutputFormat. But I don't understand why this would be the case. I've very clearly specified job.setReducerClass(CallQualityReduce.class);, so I have no idea why it would skip the reducer!Here's the code for the mapper and reducer:public static class CallQualityMap extends Mapper<NullWritable, RepublicRecord, CallSampleKey, CallSample> {
private CallSampleKey outKey = new CallSampleKey();
private CallSample outValue = new CallSample();
public void setup(Context context) {
//Configuration config = context.getConfiguration();
}
public void map(NullWritable inKey, RepublicRecord inValue, Context context) throws IOException, InterruptedException {
outKey.setData(inValue.getData("call_id"), inValue.getData("file_name_uuid"));
outValue.setData(
Long.valueOf(inValue.getData("timestamp")),
Float.valueOf(inValue.getData("current_expand_rate")),
Float.valueOf(inValue.getData("current_buffer_size")),
inValue.getData("wifi_bssid")
);
context.write(outKey, outValue);
}
}
public static class CallQualityReduce extends Reducer<CallSampleKey, CallSample, NullWritable, Text> {
public void reduce(CallSampleKey inKey, Iterator<CallSample> inValues, Context context) throws IOException, InterruptedException {
Call call = new Call(inKey.getId().toString(), inKey.getUuid().toString());
while (inValues.hasNext()) {
call.addSample(inValues.next());
}
context.write(NullWritable.get(), new Text(call.getStats()));
}
}Here's controller and syslog output:https://s3.amazonaws.com/rw-hadoop-test/logs/j-39S8C9S340U8I/steps/s-ORYZ0393MEBG/controller.gzhttps://s3.amazonaws.com/rw-hadoop-test/logs/j-39S8C9S340U8I/steps/s-ORYZ0393MEBG/syslog.gz
Any advice on how to proceed here?