You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Arsen Zahray <me...@gmail.com> on 2011/09/28 08:03:28 UTC
Type mismatch in key from map when replacing Mapper with MultithreadMapper
I'd like to implement a MultithreadMapper for my MapReduce job.
For this I replaced Mapper with MultithreadMapper in a working code.
Here's the exeption I'm getting:
java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)
Here's the code setup:
public static void main(String[] args) {
try {
if (args.length != 2) {
System.err.println("Usage: MapReduceMain <input path>
<output path>");
System.exit(123);
}
Job job = new Job();
job.setJarByClass(MapReduceMain.class);
job.setInputFormatClass(TextInputFormat.class);
FileSystem fs = FileSystem.get(URI.create(args[0]),
job.getConfiguration());
FileStatus[] files = fs.listStatus(new Path(args[0]));
for(FileStatus sfs:files){
FileInputFormat.addInputPath(job, sfs.getPath());
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MyMultithreadMapper.class);
job.setReducerClass(MyReducer.class);
MultithreadedMapper.setNumberOfThreads(job,
MyMultithreadMapper.nThreads);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(MyPage.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);//write
the result as sequential file
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
And here's the mapper's code:
public class MyMultithreadMapper extends
MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {
ConcurrentLinkedQueue<MyScraper> scrapers = new
ConcurrentLinkedQueue<MyScraper>();
public static final int nThreads = 5;
public MyMultithreadMapper() {
for (int i = 0; i < nThreads; i++) {
scrapers.add(new MyScraper());
}
}
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
MyScraper scraper = scrapers.poll();
MyPage result = null;
for (int i = 0; i < 10; i++) {
try {
result = scraper.scrapPage(value.toString(), true);
break;
} catch (Exception e) {
e.printStackTrace();
}
}
if (result == null) {
result = new MyPage();
result.setUrl(key.toString());
}
context.write(new IntWritable(result.getUrl().hashCode()), result);
scrapers.add(scraper);
}
Why the hell am I getting this?
Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper
Posted by Arsen Zahray <me...@gmail.com>.
Ok. That works.
Thanks again!
On Wed, Sep 28, 2011 at 10:01 AM, Kamesh <ka...@imaginea.com> wrote:
> **
> On Wednesday 28 September 2011 12:25 PM, Arsen Zahray wrote:
>
> Hey! Thank you for replying!
>
> Please, confirm that I understand you correctly:
> 1. Use a class, which extends mapper
> class MyMapper extends Mapper<LongWritable, Text, IntWritable, MyPage> {
>
> public void map(LongWritable key, Text value, Context context) throws
> IOException, InterruptedException {
> //implement all logic here
> }
> }
> 2. Leave MultithreadMapper class empty:
> public class MyMultithreadMapper extends MultithreadedMapper<LongWritable,
> Text, IntWritable, MyPage> {
>
> }
> 3. In main set
> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
> Should I set
> job.setMapperClass(MyMapper.class);
> or
> job.setMapperClass(MyMultithreadMapper.class);
> ?
>
> Arsen
>
> On Wed, Sep 28, 2011 at 9:38 AM, Kamesh <ka...@imaginea.com> wrote:
>
>> On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
>>
>> MultithreadMapper extends MultithreadedMapper<LongWritable, Text,
>> IntWritable, MyPage> {
>>
>> ConcurrentLinkedQueue<MyScraper> scrapers = new
>> ConcurrentLinkedQueue<MyScraper>();
>>
>> public static final int nThreads = 5;
>>
>> public MyMultithreadMapper() {
>> for (int i = 0; i < nT
>>
>> implement the map logic in a separate Mapper class and in the main method
>> set the following property
>> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>>
>> --
>> *Thanks&Regards,*
>> *Bh.V.S.Kamesh*
>>
>
> set the following property
> job.setMapperClass(MyMultithreadMapper.class);
>
> --
> *Thanks&Regards,*
> *Bh.V.S.Kamesh*
>
Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper
Posted by Kamesh <ka...@imaginea.com>.
On Wednesday 28 September 2011 12:25 PM, Arsen Zahray wrote:
> Hey! Thank you for replying!
>
> Please, confirm that I understand you correctly:
> 1. Use a class, which extends mapper
> class MyMapper extends Mapper<LongWritable, Text, IntWritable, MyPage> {
>
> public void map(LongWritable key, Text value, Context context)
> throws IOException, InterruptedException {
> //implement all logic here
> }
> }
> 2. Leave MultithreadMapper class empty:
> public class MyMultithreadMapper extends
> MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {
>
> }
> 3. In main set
> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
> Should I set
> job.setMapperClass(MyMapper.class);
> or
> job.setMapperClass(MyMultithreadMapper.class);
> ?
>
> Arsen
>
> On Wed, Sep 28, 2011 at 9:38 AM, Kamesh <kamesh.b@imaginea.com
> <ma...@imaginea.com>> wrote:
>
> On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
>> MultithreadMapper extends MultithreadedMapper<LongWritable, Text,
>> IntWritable, MyPage> {
>>
>> ConcurrentLinkedQueue<MyScraper> scrapers = new
>> ConcurrentLinkedQueue<MyScraper>();
>>
>> public static final int nThreads = 5;
>>
>> public MyMultithreadMapper() {
>> for (int i = 0; i < nT
> implement the map logic in a separate Mapper class and in the main
> method set the following property
> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
> --
> /Thanks&Regards,/
> /Bh.V.S.Kamesh/
>
>
set the following property
job.setMapperClass(MyMultithreadMapper.class);
--
/Thanks&Regards,/
/Bh.V.S.Kamesh/
Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper
Posted by Arsen Zahray <me...@gmail.com>.
Hey! Thank you for replying!
Please, confirm that I understand you correctly:
1. Use a class, which extends mapper
class MyMapper extends Mapper<LongWritable, Text, IntWritable, MyPage> {
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
//implement all logic here
}
}
2. Leave MultithreadMapper class empty:
public class MyMultithreadMapper extends MultithreadedMapper<LongWritable,
Text, IntWritable, MyPage> {
}
3. In main set
MultithreadedMapper.setMapperClass(job, MyMapper.class);
Should I set
job.setMapperClass(MyMapper.class);
or
job.setMapperClass(MyMultithreadMapper.class);
?
Arsen
On Wed, Sep 28, 2011 at 9:38 AM, Kamesh <ka...@imaginea.com> wrote:
> **
> On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
>
> MultithreadMapper extends MultithreadedMapper<LongWritable, Text,
> IntWritable, MyPage> {
>
> ConcurrentLinkedQueue<MyScraper> scrapers = new
> ConcurrentLinkedQueue<MyScraper>();
>
> public static final int nThreads = 5;
>
> public MyMultithreadMapper() {
> for (int i = 0; i < nT
>
> implement the map logic in a separate Mapper class and in the main method
> set the following property
> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
> --
> *Thanks&Regards,*
> *Bh.V.S.Kamesh*
>
Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper
Posted by Kamesh <ka...@imaginea.com>.
On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
> MultithreadMapper extends MultithreadedMapper<LongWritable, Text,
> IntWritable, MyPage> {
>
> ConcurrentLinkedQueue<MyScraper> scrapers = new
> ConcurrentLinkedQueue<MyScraper>();
>
> public static final int nThreads = 5;
>
> public MyMultithreadMapper() {
> for (int i = 0; i < nT
implement the map logic in a separate Mapper class and in the main
method set the following property
MultithreadedMapper.setMapperClass(job, MyMapper.class);
--
/Thanks&Regards,/
/Bh.V.S.Kamesh/