You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Arsen Zahray <me...@gmail.com> on 2011/09/28 08:03:28 UTC

Type mismatch in key from map when replacing Mapper with MultithreadMapper

I'd like to implement a MultithreadMapper for my MapReduce job.

For this I replaced Mapper with MultithreadMapper in a working code.

Here's the exeption I'm getting:

    java.io.IOException: Type mismatch in key from map: expected
org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
    at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
    at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
    at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at
org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)

Here's the code setup:

     public static void main(String[] args) {
        try {
            if (args.length != 2) {
                System.err.println("Usage: MapReduceMain <input path>
<output path>");
                System.exit(123);
            }
            Job job = new Job();
            job.setJarByClass(MapReduceMain.class);
            job.setInputFormatClass(TextInputFormat.class);
            FileSystem fs = FileSystem.get(URI.create(args[0]),
job.getConfiguration());
            FileStatus[] files = fs.listStatus(new Path(args[0]));
            for(FileStatus sfs:files){
                FileInputFormat.addInputPath(job, sfs.getPath());
            }
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.setMapperClass(MyMultithreadMapper.class);
            job.setReducerClass(MyReducer.class);
            MultithreadedMapper.setNumberOfThreads(job,
MyMultithreadMapper.nThreads);

            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(MyPage.class);

            job.setOutputFormatClass(SequenceFileOutputFormat.class);//write
the result as sequential file

            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

And here's the mapper's code:

    public class MyMultithreadMapper extends
MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {

    ConcurrentLinkedQueue<MyScraper>    scrapers    = new
ConcurrentLinkedQueue<MyScraper>();

    public static final int                nThreads    = 5;

    public MyMultithreadMapper() {
        for (int i = 0; i < nThreads; i++) {
            scrapers.add(new MyScraper());
        }
    }

    public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
        MyScraper scraper = scrapers.poll();

        MyPage result = null;
        for (int i = 0; i < 10; i++) {
            try {
                result = scraper.scrapPage(value.toString(), true);
                break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if (result == null) {
            result = new MyPage();
            result.setUrl(key.toString());
        }

        context.write(new IntWritable(result.getUrl().hashCode()), result);

        scrapers.add(scraper);
    }

Why the hell am I getting this?

Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper

Posted by Arsen Zahray <me...@gmail.com>.
Ok. That works.

Thanks again!

On Wed, Sep 28, 2011 at 10:01 AM, Kamesh <ka...@imaginea.com> wrote:

> **
> On Wednesday 28 September 2011 12:25 PM, Arsen Zahray wrote:
>
> Hey! Thank you for replying!
>
> Please, confirm that I understand you correctly:
> 1. Use a class, which extends mapper
> class MyMapper extends Mapper<LongWritable, Text, IntWritable, MyPage> {
>
>     public void map(LongWritable key, Text value, Context context) throws
> IOException, InterruptedException {
>          //implement all logic here
>         }
>     }
> 2. Leave MultithreadMapper class empty:
> public class MyMultithreadMapper extends MultithreadedMapper<LongWritable,
> Text, IntWritable, MyPage> {
>
> }
> 3. In main set
> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
> Should I set
> job.setMapperClass(MyMapper.class);
> or
> job.setMapperClass(MyMultithreadMapper.class);
> ?
>
> Arsen
>
> On Wed, Sep 28, 2011 at 9:38 AM, Kamesh <ka...@imaginea.com> wrote:
>
>>  On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
>>
>> MultithreadMapper extends MultithreadedMapper<LongWritable, Text,
>> IntWritable, MyPage> {
>>
>>     ConcurrentLinkedQueue<MyScraper>    scrapers    = new
>> ConcurrentLinkedQueue<MyScraper>();
>>
>>     public static final int                nThreads    = 5;
>>
>>     public MyMultithreadMapper() {
>>         for (int i = 0; i < nT
>>
>>  implement the map logic in a separate Mapper class and in the main method
>> set the following property
>> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>>
>> --
>> *Thanks&Regards,*
>> *Bh.V.S.Kamesh*
>>
>
>  set the following property
> job.setMapperClass(MyMultithreadMapper.class);
>
> --
> *Thanks&Regards,*
> *Bh.V.S.Kamesh*
>

Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper

Posted by Kamesh <ka...@imaginea.com>.
On Wednesday 28 September 2011 12:25 PM, Arsen Zahray wrote:
> Hey! Thank you for replying!
>
> Please, confirm that I understand you correctly:
> 1. Use a class, which extends mapper
> class MyMapper extends Mapper<LongWritable, Text, IntWritable, MyPage> {
>
>     public void map(LongWritable key, Text value, Context context) 
> throws IOException, InterruptedException {
>          //implement all logic here
>         }
>     }
> 2. Leave MultithreadMapper class empty:
> public class MyMultithreadMapper extends 
> MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {
>
> }
> 3. In main set
> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
> Should I set
> job.setMapperClass(MyMapper.class);
> or
> job.setMapperClass(MyMultithreadMapper.class);
> ?
>
> Arsen
>
> On Wed, Sep 28, 2011 at 9:38 AM, Kamesh <kamesh.b@imaginea.com 
> <ma...@imaginea.com>> wrote:
>
>     On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
>>     MultithreadMapper extends MultithreadedMapper<LongWritable, Text,
>>     IntWritable, MyPage> {
>>
>>         ConcurrentLinkedQueue<MyScraper>    scrapers    = new
>>     ConcurrentLinkedQueue<MyScraper>();
>>
>>         public static final int                nThreads    = 5;
>>
>>         public MyMultithreadMapper() {
>>             for (int i = 0; i < nT
>     implement the map logic in a separate Mapper class and in the main
>     method set the following property
>     MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
>     -- 
>     /Thanks&Regards,/
>     /Bh.V.S.Kamesh/
>
>
set the following property
job.setMapperClass(MyMultithreadMapper.class);

-- 
/Thanks&Regards,/
/Bh.V.S.Kamesh/

Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper

Posted by Arsen Zahray <me...@gmail.com>.
Hey! Thank you for replying!

Please, confirm that I understand you correctly:
1. Use a class, which extends mapper
class MyMapper extends Mapper<LongWritable, Text, IntWritable, MyPage> {

    public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
         //implement all logic here
        }
    }
2. Leave MultithreadMapper class empty:
public class MyMultithreadMapper extends MultithreadedMapper<LongWritable,
Text, IntWritable, MyPage> {

}
3. In main set
MultithreadedMapper.setMapperClass(job, MyMapper.class);

Should I set
job.setMapperClass(MyMapper.class);
or
job.setMapperClass(MyMultithreadMapper.class);
?

Arsen

On Wed, Sep 28, 2011 at 9:38 AM, Kamesh <ka...@imaginea.com> wrote:

> **
> On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
>
> MultithreadMapper extends MultithreadedMapper<LongWritable, Text,
> IntWritable, MyPage> {
>
>     ConcurrentLinkedQueue<MyScraper>    scrapers    = new
> ConcurrentLinkedQueue<MyScraper>();
>
>     public static final int                nThreads    = 5;
>
>     public MyMultithreadMapper() {
>         for (int i = 0; i < nT
>
> implement the map logic in a separate Mapper class and in the main method
> set the following property
> MultithreadedMapper.setMapperClass(job, MyMapper.class);
>
> --
> *Thanks&Regards,*
> *Bh.V.S.Kamesh*
>

Re: Type mismatch in key from map when replacing Mapper with MultithreadMapper

Posted by Kamesh <ka...@imaginea.com>.
On Wednesday 28 September 2011 11:33 AM, Arsen Zahray wrote:
> MultithreadMapper extends MultithreadedMapper<LongWritable, Text, 
> IntWritable, MyPage> {
>
>     ConcurrentLinkedQueue<MyScraper>    scrapers    = new 
> ConcurrentLinkedQueue<MyScraper>();
>
>     public static final int                nThreads    = 5;
>
>     public MyMultithreadMapper() {
>         for (int i = 0; i < nT
implement the map logic in a separate Mapper class and in the main 
method set the following property
MultithreadedMapper.setMapperClass(job, MyMapper.class);

-- 
/Thanks&Regards,/
/Bh.V.S.Kamesh/