You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-dev@hadoop.apache.org by "Arsen Zahray (Resolved) (JIRA)" <ji...@apache.org> on 2011/09/28 12:13:45 UTC

[jira] [Resolved] (MAPREDUCE-3106) Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map"

     [ https://issues.apache.org/jira/browse/MAPREDUCE-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Arsen Zahray resolved MAPREDUCE-3106.
-------------------------------------

    Resolution: Won't Fix
    
> Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map" 
> -------------------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-3106
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 0.20.203.0
>            Reporter: Arsen Zahray
>            Priority: Blocker
>
> I have a hadoop job, which works perfectly fine when done with a class implementing Mapper. When I do replace Mapper with MultithreadMapper, the job crashes with following message:
> java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
> 	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
> 	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
> 	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> 	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
> 	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> 	at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> 	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)
> Here are the relevant source codes:
> public class MapReduceMain {
> 	/**
> 	 * @param args
> 	 */
> 	public static void main(String[] args) {
> 		try {
> 			if (args.length != 2) {
> 				System.err.println("Usage: MapReduceMain <input path> <output path>");
> 				System.exit(123);
> 			}
> 			Job job = new Job();
> 			job.setJarByClass(MapReduceMain.class);
> 			job.setInputFormatClass(TextInputFormat.class);
> 			FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration());
> 			FileStatus[] files = fs.listStatus(new Path(args[0]));
> 			for(FileStatus sfs:files){
> 				FileInputFormat.addInputPath(job, sfs.getPath());
> 			}
> 			FileOutputFormat.setOutputPath(job, new Path(args[1]));
> 			
> 			job.setMapperClass(MyMultithreadMapper.class);
> 			job.setReducerClass(MyReducer.class);
> 			MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads);
> 			job.setOutputKeyClass(IntWritable.class); 
> 			job.setOutputValueClass(MyPage.class);
> 			
> 			job.setOutputFormatClass(SequenceFileOutputFormat.class);
> 			
> 			System.exit(job.waitForCompletion(true) ? 0 : 1);
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
> public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {
> 	ConcurrentLinkedQueue<MyScraper>	scrapers	= new ConcurrentLinkedQueue<MyScraper>();
> 	public static final int				nThreads	= 5;
> 	public VrboMultithreadMapper() {
> 		for (int i = 0; i < nThreads; i++) {
> 			scrapers.add(new MyScraper());
> 		}
> 	}
> 	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
> 		MyScraper scraper = scrapers.poll();
> 		MyPage result = null;
> 		for (int i = 0; i < 10; i++) {
> 			try {
> 				result = scraper.scrapPage(value.toString(), true);
> 				break;
> 			} catch (Exception e) {
> 				e.printStackTrace();
> 			}
> 		}
> 		if (result == null) {
> 			result = new MyPage();
> 			result.setUrl(key.toString());
> 		}
> 		context.write(new IntWritable(result.getUrl().hashCode()), result);
> 		scrapers.add(scraper);
> 	}
> }
> and here's the code for the working mapper class, just to be sure:
> public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
> 	MyScraper scr = new MyScraper();
> 	
> 	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
> 		MyPage result =null;
> 		for(int i=0;i<10;i++){
> 			try{
> 				result = scr.scrapPage(value.toString(), true);
> 				break;
> 			}catch(Exception e){
> 				e.printStackTrace();
> 			}
> 		}
> 		
> 		if(result==null){
> 			result = new MyPage();
> 			result.setUrl(key.toString());
> 		}
> 		
> 		context.write(new IntWritable(result.getUrl().hashCode()),result);
> 	}
> }
> This appears to be a hadoop bug

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira