You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-dev@hadoop.apache.org by "Arsen Zahray (Created) (JIRA)" <ji...@apache.org> on 2011/09/27 09:29:13 UTC

[jira] [Created] (MAPREDUCE-3106) Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map"

Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map" 
-------------------------------------------------------------------------------------------------------

                 Key: MAPREDUCE-3106
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
             Project: Hadoop Map/Reduce
          Issue Type: Bug
    Affects Versions: 0.20.203.0
            Reporter: Arsen Zahray
            Priority: Blocker


I have a hadoop job, which works perfectly fine when done with a class implementing Mapper. When I do replace Mapper with MultithreadMapper, the job crashes with following message:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
	at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)

Here are the relevant source codes:

public class MapReduceMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			if (args.length != 2) {
				System.err.println("Usage: MapReduceMain <input path> <output path>");
				System.exit(123);
			}
			Job job = new Job();
			job.setJarByClass(MapReduceMain.class);
			job.setInputFormatClass(TextInputFormat.class);
			FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration());
			FileStatus[] files = fs.listStatus(new Path(args[0]));
			for(FileStatus sfs:files){
				FileInputFormat.addInputPath(job, sfs.getPath());
			}
			FileOutputFormat.setOutputPath(job, new Path(args[1]));
			
			job.setMapperClass(MyMultithreadMapper.class);
			job.setReducerClass(MyReducer.class);
			MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads);

			job.setOutputKeyClass(IntWritable.class); 
			job.setOutputValueClass(MyPage.class);
			
			job.setOutputFormatClass(SequenceFileOutputFormat.class);
			
			System.exit(job.waitForCompletion(true) ? 0 : 1);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}


public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {

	ConcurrentLinkedQueue<MyScraper>	scrapers	= new ConcurrentLinkedQueue<MyScraper>();

	public static final int				nThreads	= 5;

	public VrboMultithreadMapper() {
		for (int i = 0; i < nThreads; i++) {
			scrapers.add(new MyScraper());
		}
	}

	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		MyScraper scraper = scrapers.poll();

		MyPage result = null;
		for (int i = 0; i < 10; i++) {
			try {
				result = scraper.scrapPage(value.toString(), true);
				break;
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		if (result == null) {
			result = new MyPage();
			result.setUrl(key.toString());
		}

		context.write(new IntWritable(result.getUrl().hashCode()), result);

		scrapers.add(scraper);
	}
}


and here's the code for the working mapper class, just to be sure:

public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
	MyScraper scr = new MyScraper();
	
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		MyPage result =null;
		for(int i=0;i<10;i++){
			try{
				result = scr.scrapPage(value.toString(), true);
				break;
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		
		if(result==null){
			result = new MyPage();
			result.setUrl(key.toString());
		}
		
		context.write(new IntWritable(result.getUrl().hashCode()),result);
	}
}


This appears to be a hadoop bug

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Resolved] (MAPREDUCE-3106) Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map"

Posted by "Arsen Zahray (Resolved) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Arsen Zahray resolved MAPREDUCE-3106.
-------------------------------------

    Resolution: Won't Fix
    
> Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map" 
> -------------------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-3106
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 0.20.203.0
>            Reporter: Arsen Zahray
>            Priority: Blocker
>
> I have a hadoop job, which works perfectly fine when done with a class implementing Mapper. When I do replace Mapper with MultithreadMapper, the job crashes with following message:
> java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
> 	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
> 	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
> 	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> 	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
> 	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> 	at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> 	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)
> Here are the relevant source codes:
> public class MapReduceMain {
> 	/**
> 	 * @param args
> 	 */
> 	public static void main(String[] args) {
> 		try {
> 			if (args.length != 2) {
> 				System.err.println("Usage: MapReduceMain <input path> <output path>");
> 				System.exit(123);
> 			}
> 			Job job = new Job();
> 			job.setJarByClass(MapReduceMain.class);
> 			job.setInputFormatClass(TextInputFormat.class);
> 			FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration());
> 			FileStatus[] files = fs.listStatus(new Path(args[0]));
> 			for(FileStatus sfs:files){
> 				FileInputFormat.addInputPath(job, sfs.getPath());
> 			}
> 			FileOutputFormat.setOutputPath(job, new Path(args[1]));
> 			
> 			job.setMapperClass(MyMultithreadMapper.class);
> 			job.setReducerClass(MyReducer.class);
> 			MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads);
> 			job.setOutputKeyClass(IntWritable.class); 
> 			job.setOutputValueClass(MyPage.class);
> 			
> 			job.setOutputFormatClass(SequenceFileOutputFormat.class);
> 			
> 			System.exit(job.waitForCompletion(true) ? 0 : 1);
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
> public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {
> 	ConcurrentLinkedQueue<MyScraper>	scrapers	= new ConcurrentLinkedQueue<MyScraper>();
> 	public static final int				nThreads	= 5;
> 	public VrboMultithreadMapper() {
> 		for (int i = 0; i < nThreads; i++) {
> 			scrapers.add(new MyScraper());
> 		}
> 	}
> 	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
> 		MyScraper scraper = scrapers.poll();
> 		MyPage result = null;
> 		for (int i = 0; i < 10; i++) {
> 			try {
> 				result = scraper.scrapPage(value.toString(), true);
> 				break;
> 			} catch (Exception e) {
> 				e.printStackTrace();
> 			}
> 		}
> 		if (result == null) {
> 			result = new MyPage();
> 			result.setUrl(key.toString());
> 		}
> 		context.write(new IntWritable(result.getUrl().hashCode()), result);
> 		scrapers.add(scraper);
> 	}
> }
> and here's the code for the working mapper class, just to be sure:
> public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
> 	MyScraper scr = new MyScraper();
> 	
> 	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
> 		MyPage result =null;
> 		for(int i=0;i<10;i++){
> 			try{
> 				result = scr.scrapPage(value.toString(), true);
> 				break;
> 			}catch(Exception e){
> 				e.printStackTrace();
> 			}
> 		}
> 		
> 		if(result==null){
> 			result = new MyPage();
> 			result.setUrl(key.toString());
> 		}
> 		
> 		context.write(new IntWritable(result.getUrl().hashCode()),result);
> 	}
> }
> This appears to be a hadoop bug

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira