You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Eric Sammer <es...@cloudera.com> on 2010/06/18 23:45:16 UTC
Re: Need help with exception when mapper emits different key class
from reducer
This took me a full read through to figure out. The problem is that
you're using your reducer as a combiner and when it runs, the output
of the map stage then becomes the wrong type.
In pseudo-visual-speak:
Object, Int -> Map() -> MyText, Int -> Combine() -> YourText, Int -> EXCEPTION!
When using your reducer as a combiner, the reducer outputs *must*
match the map outputs. In other words, your combiner - which is
*optional* in the chain at Hadoop's pleasure - is changing the key
space. That's a no-no. In your case, you can't reuse your reducer as a
combiner.
(The hint is in the exception: it's occurring in the combiner classes
in Hadoop.)
Hope that helps.
On Fri, Jun 18, 2010 at 2:09 PM, Steve Lewis <lo...@gmail.com> wrote:
>
> This class is a copy of a standard WordCount class with one critical
> exception
> Instead of the Mapper Emitting a Key of Type Text it emits a key of type
> MyText - s simple subclass of Text
> The reducer emits a different subclass of Text - YourText
> I say
> job.setMapOutputKeyClass(MyText.class);
> job.setMapOutputValueClass(IntWritable.class);
> job.setOutputKeyClass(YourText.class);
> job.setOutputValueClass(IntWritable.class);
> which should declare these classes directly and yet I get the following
> exception using hadoop 0.2 on a local box
> What am I doing wrong
>
> java.io.IOException: wrong key class: class
> org.systemsbiology.hadoop.CapitalWordCount$YourText is not class
> org.systemsbiology.hadoop.CapitalWordCount$MyText
> at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
> at
> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880)
> at
> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201)
> at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> at
> org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89)
>
> package org.systemsbiology.hadoop;
> import com.lordjoe.utilities.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
> import org.apache.hadoop.mapreduce.lib.output.*;
> import org.apache.hadoop.util.*;
> import java.io.*;
> import java.util.*;
> /**
> * org.systemsbiology.hadoop.CapitalWordCount
> */
> public class CapitalWordCount {
> public static class YourText extends Text
> {
> public YourText() {
> }
> /**
> * Construct from a string.
> */
> public YourText(final String string) {
> super(string);
> }
> }
> public static class MyText extends Text
> {
> public MyText() {
> }
> /**
> * Construct from a string.
> */
> public MyText(final String string) {
> super(string);
> }
>
> }
> public static class TokenizerMapper
> extends Mapper<Object, Text, MyText, IntWritable> {
> private final static IntWritable one = new IntWritable(1);
> private MyText word = new MyText();
> public void map(Object key, Text value, Context context
> ) throws IOException, InterruptedException {
> StringTokenizer itr = new StringTokenizer(value.toString());
> while (itr.hasMoreTokens()) {
> String s = itr.nextToken().toUpperCase();
> s = dropNonLetters(s);
> if (s.length() > 0) {
> word.set(s);
> context.write(word, one);
> }
> }
> }
> }
> public static String dropNonLetters(String s) {
> StringBuilder sb = new StringBuilder();
> for (int i = 0; i < s.length(); i++) {
> char c = s.charAt(i);
> if (Character.isLetter(c))
> sb.append(c);
> }
> return sb.toString();
> }
> public static class IntSumReducer
> extends Reducer<MyText, IntWritable, YourText, IntWritable> {
> private IntWritable result = new IntWritable();
> public void reduce(MyText key, Iterable<IntWritable> values,
> Context context
> ) throws IOException, InterruptedException {
> int sum = 0;
> for (IntWritable val : values) {
> sum += val.get();
> }
> result.set(sum);
> context.write(new YourText(key.toString()), result);
> }
> }
> public static class MyPartitioner extends Partitioner<Text, IntWritable>
> {
> /**
> * Get the partition number for a given key (hence record) given the
> total
> * number of partitions i.e. number of reduce-tasks for the job.
> * <p/>
> * <p>Typically a hash function on a all or a subset of the key.</p>
> *
> * @param key the key to be partioned.
> * @param value the entry value.
> * @param numPartitions the total number of partitions.
> * @return the partition number for the <code>key</code>.
> */
> @Override
> public int getPartition(Text key, IntWritable value, int
> numPartitions) {
> String s = key.toString();
> if (s.length() == 0)
> return 0;
> char c = s.charAt(0);
> int letter = Character.toUpperCase(c) - 'A';
> if (letter < 0 || letter > 26)
> return 0;
> return letter % numPartitions;
> }
> }
>
> /**
> * Force loading of needed classes to make
> */
> public static final Class[] NEEDED =
> {
> org.apache.commons.logging.LogFactory.class,
> org.apache.commons.cli.ParseException.class
> };
>
> public static final int DEFAULT_REDUCE_TASKS = 14;
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> String[] otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
> // if (otherArgs.length != 2) {
> // System.err.println("Usage: wordcount <in> <out>");
> // System.exit(2);
> // }
> Job job = new Job(conf, "word count");
> job.setJarByClass(CapitalWordCount.class);
> job.setMapperClass(TokenizerMapper.class);
> job.setCombinerClass(IntSumReducer.class);
> job.setReducerClass(IntSumReducer.class);
>
> job.setMapOutputKeyClass(MyText.class);
> job.setMapOutputValueClass(IntWritable.class);
> job.setOutputKeyClass(YourText.class);
> job.setOutputValueClass(IntWritable.class);
>
> // added Slewis
> job.setNumReduceTasks(DEFAULT_REDUCE_TASKS);
> job.setPartitionerClass(MyPartitioner.class);
> if(otherArgs.length > 1) {
> FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
> }
> String athString = otherArgs[otherArgs.length - 1];
> File out = new File(athString);
> if (out.exists()) {
> FileUtilities.expungeDirectory(out);
> out.delete();
> }
> Path outputDir = new Path(athString);
>
> FileOutputFormat.setOutputPath(job, outputDir);
>
> boolean ans = job.waitForCompletion(true);
> int ret = ans ? 0 : 1;
> System.exit(ret);
> }
> }
> --
> Steven M. Lewis PhD
> Institute for Systems Biology
> Seattle WA
>
--
Eric Sammer
twitter: esammer
data: www.cloudera.com
Re: Need help with exception when mapper emits different key class
from reducer
Posted by Ted Yu <yu...@gmail.com>.
There is no need to call job.setCombinerClass()
Combiner is optional.
On Sat, Jun 19, 2010 at 10:01 AM, Steve Lewis <lo...@gmail.com> wrote:
> Wow - I cannot tell you how much I thank you - I totally missed the fact
> that the exception is thrown in the combiner since I was seeing the
> exception in the reducer - I always thought the combiner was called between
> the mapper and the reducer and not after
> the reducer -
> Also does this mean I should use null as a combiner or use a very
> generic combiner - especially for my real problem when there is no real
> combiner step
>
> On Fri, Jun 18, 2010 at 2:45 PM, Eric Sammer <es...@cloudera.com> wrote:
>
>> This took me a full read through to figure out. The problem is that
>> you're using your reducer as a combiner and when it runs, the output
>> of the map stage then becomes the wrong type.
>>
>> In pseudo-visual-speak:
>>
>> Object, Int -> Map() -> MyText, Int -> Combine() -> YourText, Int ->
>> EXCEPTION!
>>
>> When using your reducer as a combiner, the reducer outputs *must*
>> match the map outputs. In other words, your combiner - which is
>> *optional* in the chain at Hadoop's pleasure - is changing the key
>> space. That's a no-no. In your case, you can't reuse your reducer as a
>> combiner.
>>
>> (The hint is in the exception: it's occurring in the combiner classes
>> in Hadoop.)
>>
>> Hope that helps.
>>
>> On Fri, Jun 18, 2010 at 2:09 PM, Steve Lewis <lo...@gmail.com>
>> wrote:
>> >
>> > This class is a copy of a standard WordCount class with one critical
>> > exception
>> > Instead of the Mapper Emitting a Key of Type Text it emits a key of type
>> > MyText - s simple subclass of Text
>> > The reducer emits a different subclass of Text - YourText
>> > I say
>> > job.setMapOutputKeyClass(MyText.class);
>> > job.setMapOutputValueClass(IntWritable.class);
>> > job.setOutputKeyClass(YourText.class);
>> > job.setOutputValueClass(IntWritable.class);
>> > which should declare these classes directly and yet I get the following
>> > exception using hadoop 0.2 on a local box
>> > What am I doing wrong
>> >
>> > java.io.IOException: wrong key class: class
>> > org.systemsbiology.hadoop.CapitalWordCount$YourText is not class
>> > org.systemsbiology.hadoop.CapitalWordCount$MyText
>> > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
>> > at
>> >
>> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880)
>> > at
>> >
>> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201)
>> > at
>> >
>> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>> > at
>> >
>> org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89)
>> >
>> > package org.systemsbiology.hadoop;
>> > import com.lordjoe.utilities.*;
>> > import org.apache.hadoop.conf.*;
>> > import org.apache.hadoop.fs.*;
>> > import org.apache.hadoop.io.*;
>> > import org.apache.hadoop.mapreduce.*;
>> > import org.apache.hadoop.mapreduce.lib.input.*;
>> > import org.apache.hadoop.mapreduce.lib.output.*;
>> > import org.apache.hadoop.util.*;
>> > import java.io.*;
>> > import java.util.*;
>> > /**
>> > * org.systemsbiology.hadoop.CapitalWordCount
>> > */
>> > public class CapitalWordCount {
>> > public static class YourText extends Text
>> > {
>> > public YourText() {
>> > }
>> > /**
>> > * Construct from a string.
>> > */
>> > public YourText(final String string) {
>> > super(string);
>> > }
>> > }
>> > public static class MyText extends Text
>> > {
>> > public MyText() {
>> > }
>> > /**
>> > * Construct from a string.
>> > */
>> > public MyText(final String string) {
>> > super(string);
>> > }
>> >
>> > }
>> > public static class TokenizerMapper
>> > extends Mapper<Object, Text, MyText, IntWritable> {
>> > private final static IntWritable one = new IntWritable(1);
>> > private MyText word = new MyText();
>> > public void map(Object key, Text value, Context context
>> > ) throws IOException, InterruptedException {
>> > StringTokenizer itr = new StringTokenizer(value.toString());
>> > while (itr.hasMoreTokens()) {
>> > String s = itr.nextToken().toUpperCase();
>> > s = dropNonLetters(s);
>> > if (s.length() > 0) {
>> > word.set(s);
>> > context.write(word, one);
>> > }
>> > }
>> > }
>> > }
>> > public static String dropNonLetters(String s) {
>> > StringBuilder sb = new StringBuilder();
>> > for (int i = 0; i < s.length(); i++) {
>> > char c = s.charAt(i);
>> > if (Character.isLetter(c))
>> > sb.append(c);
>> > }
>> > return sb.toString();
>> > }
>> > public static class IntSumReducer
>> > extends Reducer<MyText, IntWritable, YourText, IntWritable>
>> {
>> > private IntWritable result = new IntWritable();
>> > public void reduce(MyText key, Iterable<IntWritable> values,
>> > Context context
>> > ) throws IOException, InterruptedException {
>> > int sum = 0;
>> > for (IntWritable val : values) {
>> > sum += val.get();
>> > }
>> > result.set(sum);
>> > context.write(new YourText(key.toString()), result);
>> > }
>> > }
>> > public static class MyPartitioner extends Partitioner<Text,
>> IntWritable>
>> > {
>> > /**
>> > * Get the partition number for a given key (hence record) given
>> the
>> > total
>> > * number of partitions i.e. number of reduce-tasks for the job.
>> > * <p/>
>> > * <p>Typically a hash function on a all or a subset of the
>> key.</p>
>> > *
>> > * @param key the key to be partioned.
>> > * @param value the entry value.
>> > * @param numPartitions the total number of partitions.
>> > * @return the partition number for the <code>key</code>.
>> > */
>> > @Override
>> > public int getPartition(Text key, IntWritable value, int
>> > numPartitions) {
>> > String s = key.toString();
>> > if (s.length() == 0)
>> > return 0;
>> > char c = s.charAt(0);
>> > int letter = Character.toUpperCase(c) - 'A';
>> > if (letter < 0 || letter > 26)
>> > return 0;
>> > return letter % numPartitions;
>> > }
>> > }
>> >
>> > /**
>> > * Force loading of needed classes to make
>> > */
>> > public static final Class[] NEEDED =
>> > {
>> > org.apache.commons.logging.LogFactory.class,
>> > org.apache.commons.cli.ParseException.class
>> > };
>> >
>> > public static final int DEFAULT_REDUCE_TASKS = 14;
>> > public static void main(String[] args) throws Exception {
>> > Configuration conf = new Configuration();
>> > String[] otherArgs = new GenericOptionsParser(conf,
>> > args).getRemainingArgs();
>> > // if (otherArgs.length != 2) {
>> > // System.err.println("Usage: wordcount <in> <out>");
>> > // System.exit(2);
>> > // }
>> > Job job = new Job(conf, "word count");
>> > job.setJarByClass(CapitalWordCount.class);
>> > job.setMapperClass(TokenizerMapper.class);
>> > job.setCombinerClass(IntSumReducer.class);
>> > job.setReducerClass(IntSumReducer.class);
>> >
>> > job.setMapOutputKeyClass(MyText.class);
>> > job.setMapOutputValueClass(IntWritable.class);
>> > job.setOutputKeyClass(YourText.class);
>> > job.setOutputValueClass(IntWritable.class);
>> >
>> > // added Slewis
>> > job.setNumReduceTasks(DEFAULT_REDUCE_TASKS);
>> > job.setPartitionerClass(MyPartitioner.class);
>> > if(otherArgs.length > 1) {
>> > FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
>> > }
>> > String athString = otherArgs[otherArgs.length - 1];
>> > File out = new File(athString);
>> > if (out.exists()) {
>> > FileUtilities.expungeDirectory(out);
>> > out.delete();
>> > }
>> > Path outputDir = new Path(athString);
>> >
>> > FileOutputFormat.setOutputPath(job, outputDir);
>> >
>> > boolean ans = job.waitForCompletion(true);
>> > int ret = ans ? 0 : 1;
>> > System.exit(ret);
>> > }
>> > }
>> > --
>> > Steven M. Lewis PhD
>> > Institute for Systems Biology
>> > Seattle WA
>> >
>>
>>
>>
>> --
>> Eric Sammer
>> twitter: esammer
>> data: www.cloudera.com
>>
>
>
>
> --
> Steven M. Lewis PhD
> Institute for Systems Biology
> Seattle WA
>
Re: Need help with exception when mapper emits different key class
from reducer
Posted by Steve Lewis <lo...@gmail.com>.
Wow - I cannot tell you how much I thank you - I totally missed the fact
that the exception is thrown in the combiner since I was seeing the
exception in the reducer - I always thought the combiner was called between
the mapper and the reducer and not after
the reducer -
Also does this mean I should use null as a combiner or use a very
generic combiner - especially for my real problem when there is no real
combiner step
On Fri, Jun 18, 2010 at 2:45 PM, Eric Sammer <es...@cloudera.com> wrote:
> This took me a full read through to figure out. The problem is that
> you're using your reducer as a combiner and when it runs, the output
> of the map stage then becomes the wrong type.
>
> In pseudo-visual-speak:
>
> Object, Int -> Map() -> MyText, Int -> Combine() -> YourText, Int ->
> EXCEPTION!
>
> When using your reducer as a combiner, the reducer outputs *must*
> match the map outputs. In other words, your combiner - which is
> *optional* in the chain at Hadoop's pleasure - is changing the key
> space. That's a no-no. In your case, you can't reuse your reducer as a
> combiner.
>
> (The hint is in the exception: it's occurring in the combiner classes
> in Hadoop.)
>
> Hope that helps.
>
> On Fri, Jun 18, 2010 at 2:09 PM, Steve Lewis <lo...@gmail.com>
> wrote:
> >
> > This class is a copy of a standard WordCount class with one critical
> > exception
> > Instead of the Mapper Emitting a Key of Type Text it emits a key of type
> > MyText - s simple subclass of Text
> > The reducer emits a different subclass of Text - YourText
> > I say
> > job.setMapOutputKeyClass(MyText.class);
> > job.setMapOutputValueClass(IntWritable.class);
> > job.setOutputKeyClass(YourText.class);
> > job.setOutputValueClass(IntWritable.class);
> > which should declare these classes directly and yet I get the following
> > exception using hadoop 0.2 on a local box
> > What am I doing wrong
> >
> > java.io.IOException: wrong key class: class
> > org.systemsbiology.hadoop.CapitalWordCount$YourText is not class
> > org.systemsbiology.hadoop.CapitalWordCount$MyText
> > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
> > at
> >
> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:880)
> > at
> >
> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1201)
> > at
> >
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> > at
> >
> org.systemsbiology.hadoop.CapitalWordCount$IntSumReducer.reduce(CapitalWordCount.java:89)
> >
> > package org.systemsbiology.hadoop;
> > import com.lordjoe.utilities.*;
> > import org.apache.hadoop.conf.*;
> > import org.apache.hadoop.fs.*;
> > import org.apache.hadoop.io.*;
> > import org.apache.hadoop.mapreduce.*;
> > import org.apache.hadoop.mapreduce.lib.input.*;
> > import org.apache.hadoop.mapreduce.lib.output.*;
> > import org.apache.hadoop.util.*;
> > import java.io.*;
> > import java.util.*;
> > /**
> > * org.systemsbiology.hadoop.CapitalWordCount
> > */
> > public class CapitalWordCount {
> > public static class YourText extends Text
> > {
> > public YourText() {
> > }
> > /**
> > * Construct from a string.
> > */
> > public YourText(final String string) {
> > super(string);
> > }
> > }
> > public static class MyText extends Text
> > {
> > public MyText() {
> > }
> > /**
> > * Construct from a string.
> > */
> > public MyText(final String string) {
> > super(string);
> > }
> >
> > }
> > public static class TokenizerMapper
> > extends Mapper<Object, Text, MyText, IntWritable> {
> > private final static IntWritable one = new IntWritable(1);
> > private MyText word = new MyText();
> > public void map(Object key, Text value, Context context
> > ) throws IOException, InterruptedException {
> > StringTokenizer itr = new StringTokenizer(value.toString());
> > while (itr.hasMoreTokens()) {
> > String s = itr.nextToken().toUpperCase();
> > s = dropNonLetters(s);
> > if (s.length() > 0) {
> > word.set(s);
> > context.write(word, one);
> > }
> > }
> > }
> > }
> > public static String dropNonLetters(String s) {
> > StringBuilder sb = new StringBuilder();
> > for (int i = 0; i < s.length(); i++) {
> > char c = s.charAt(i);
> > if (Character.isLetter(c))
> > sb.append(c);
> > }
> > return sb.toString();
> > }
> > public static class IntSumReducer
> > extends Reducer<MyText, IntWritable, YourText, IntWritable> {
> > private IntWritable result = new IntWritable();
> > public void reduce(MyText key, Iterable<IntWritable> values,
> > Context context
> > ) throws IOException, InterruptedException {
> > int sum = 0;
> > for (IntWritable val : values) {
> > sum += val.get();
> > }
> > result.set(sum);
> > context.write(new YourText(key.toString()), result);
> > }
> > }
> > public static class MyPartitioner extends Partitioner<Text,
> IntWritable>
> > {
> > /**
> > * Get the partition number for a given key (hence record) given
> the
> > total
> > * number of partitions i.e. number of reduce-tasks for the job.
> > * <p/>
> > * <p>Typically a hash function on a all or a subset of the
> key.</p>
> > *
> > * @param key the key to be partioned.
> > * @param value the entry value.
> > * @param numPartitions the total number of partitions.
> > * @return the partition number for the <code>key</code>.
> > */
> > @Override
> > public int getPartition(Text key, IntWritable value, int
> > numPartitions) {
> > String s = key.toString();
> > if (s.length() == 0)
> > return 0;
> > char c = s.charAt(0);
> > int letter = Character.toUpperCase(c) - 'A';
> > if (letter < 0 || letter > 26)
> > return 0;
> > return letter % numPartitions;
> > }
> > }
> >
> > /**
> > * Force loading of needed classes to make
> > */
> > public static final Class[] NEEDED =
> > {
> > org.apache.commons.logging.LogFactory.class,
> > org.apache.commons.cli.ParseException.class
> > };
> >
> > public static final int DEFAULT_REDUCE_TASKS = 14;
> > public static void main(String[] args) throws Exception {
> > Configuration conf = new Configuration();
> > String[] otherArgs = new GenericOptionsParser(conf,
> > args).getRemainingArgs();
> > // if (otherArgs.length != 2) {
> > // System.err.println("Usage: wordcount <in> <out>");
> > // System.exit(2);
> > // }
> > Job job = new Job(conf, "word count");
> > job.setJarByClass(CapitalWordCount.class);
> > job.setMapperClass(TokenizerMapper.class);
> > job.setCombinerClass(IntSumReducer.class);
> > job.setReducerClass(IntSumReducer.class);
> >
> > job.setMapOutputKeyClass(MyText.class);
> > job.setMapOutputValueClass(IntWritable.class);
> > job.setOutputKeyClass(YourText.class);
> > job.setOutputValueClass(IntWritable.class);
> >
> > // added Slewis
> > job.setNumReduceTasks(DEFAULT_REDUCE_TASKS);
> > job.setPartitionerClass(MyPartitioner.class);
> > if(otherArgs.length > 1) {
> > FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
> > }
> > String athString = otherArgs[otherArgs.length - 1];
> > File out = new File(athString);
> > if (out.exists()) {
> > FileUtilities.expungeDirectory(out);
> > out.delete();
> > }
> > Path outputDir = new Path(athString);
> >
> > FileOutputFormat.setOutputPath(job, outputDir);
> >
> > boolean ans = job.waitForCompletion(true);
> > int ret = ans ? 0 : 1;
> > System.exit(ret);
> > }
> > }
> > --
> > Steven M. Lewis PhD
> > Institute for Systems Biology
> > Seattle WA
> >
>
>
>
> --
> Eric Sammer
> twitter: esammer
> data: www.cloudera.com
>
--
Steven M. Lewis PhD
Institute for Systems Biology
Seattle WA