You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Saptarshi Guha <sa...@gmail.com> on 2008/11/16 23:18:19 UTC

A question about the combiner, reducer and the Output value class: can they be different?

Hello,
	If my understanding is correct, the combiner will read in values for  
a given key, process it, output it and then **all** values for a key  
are given to the reducer.
	Then it ought to be possible for the combiner to be of the form

	public static class ClosestCenterCB extends MapReduceBase implements  
Reducer<IntWritable, Text, IntWritable, BytesWritable>{
		public void reduce(IntWritable key, Iterator<Text> values,  
OutputCollector<IntWritable, BytesWritable> output, Reporter reporter) 
{...}
	}
		
	and the reducer:
	public static class ClosestCenterMR extends MapReduceBase implements  
Mapper<LongWritable, Text, IntWritable, Text>, Reducer<IntWritable,  
BytesWritable, IntWritable, Text>{
		public void reduce(IntWritable key, Iterator<BytesWritable> values,  
OutputCollector<IntWritable, Text> output, Reporter reporter) throws  
IOException { ..}
	}

	However, when I set up the jobconf
	theJob.setOutputKeyClass(IntWritable.class);
	theJob.setOutputValueClass(Text.class);
	theJob.setReducerClass(ClosestCenterMR.class);
	theJob.setCombinerClass(ClosestCenterCB.class);

	The outputvalue is TextClass and so I get the following error:
	ava.io.IOException: wrong value class: class  
org.apache.hadoop.io.BytesWritable is not class  
org.apache.hadoop.io.Text
	at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:144)
	at org.apache.hadoop.mapred.Task 
$CombineOutputCollector.collect(Task.java:626)
	at org.saptarshiguha.clusters.ClusterCenter 
$ClosestCenterCB.reduce(Unknown Source)
	at org.saptarshiguha.clusters.ClusterCenter 
$ClosestCenterCB.reduce(Unknown Source)
	at org.apache.hadoop.mapred.MapTask 
$MapOutputBuffer.combineAndSpill(MapTask.java:904)
	at org.apache.hadoop.mapred.MapTask 
$MapOutputBuffer.sortAndSpill(MapTask.java:785)
	at org.apache.hadoop.mapred.MapTask 
$MapOutputBuffer.flush(MapTask.java:698)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:228)
	at org.apache.hadoop.mapred.LocalJobRunner 
$Job.run(LocalJobRunner.java:157)
	08/11/16 16:48:18 INFO mapred.LocalJobRunner: file:/tmp/input/ 
sample.data:0+308

Saptarshi Guha | saptarshi.guha@gmail.com | http://www.stat.purdue.edu/~sguha
This is your fortune.


Re: A question about the combiner, reducer and the Output value class: can they be different?

Posted by Jeremy Chow <co...@gmail.com>.
Hey Saptarshi ,

In fact there is a interesting wrapper can help you output many different
types of values, that is org.apache.hadoop.io.GenericWritable.
You can write your own Writable class, inherite from it. Following is its
document,

A wrapper for Writable instances.

When two sequence files, which have same Key type but different Value types,
are mapped out to reduce, multiple Value types is not allowed. In this case,
this class can help you wrap instances with different types.

Compared with ObjectWritable, this class is much more effective, because
ObjectWritable will append the class declaration as a String into the output
file in every Key-Value pair.

Generic Writable implements Configurable interface, so that it will be
configured by the framework. The configuration is passed to the wrapped
objects implementing Configurable interface *before deserialization*.
how to use it:
1. Write your own class, such as GenericObject, which extends
GenericWritable.
2. Implements the abstract method getTypes(), defines the classes which will
be wrapped in GenericObject in application. Attention: this classes defined
in getTypes() method, must implement Writable interface.

The code looks like this:

 public class GenericObject extends GenericWritable {

   private static Class[] CLASSES = {
               ClassType1.class,
               ClassType2.class,
               ClassType3.class,
               };

   protected Class[] getTypes() {
       return CLASSES;
   }

 }

For example,  in your case,

public class YourWritable extends GenericWritable {
  private static Class<? extends Writable>[] CLASSES = null;

  static {
    CLASSES = (Class<? extends Writable>[]) new Class[] {
        org.apache.hadoop.io.IntWritable.class,
        org.apache.hadoop.io.BytesWritable.class};
  }

  public YourWritable () {
  }

  public YourWritable(Writable instance) {
    set(instance);
  }

  @Override
  protected Class<? extends Writable>[] getTypes() {
    return CLASSES;
  }
}

then modify your Jobconf like this,

  theJob.setOutputKeyClass(IntWritable.class);
  theJob.setOutputValueClass(YourWritable.class);
  ...

after that, you mapper and reducer class can be written as

       public static class ClosestCenterCB extends MapReduceBase implements
Reducer<IntWritable, Text, IntWritable, YourWritable>{
               public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<IntWritable, YourWritable> output, Reporter reporter){
            BytesWritable outValue = .... ;
            ouput.collect(outKey, new YourWritable(outValue)); // wrap it
          }
       }

       public static class YourReducer extends MapReduceBase implements
Reducer<IntWritable, YourWritable, IntWritable, YourWritable>{
               public void reduce(IntWritable key, Iterator<YourWritable>
values, OutputCollector<IntWritable, YourWritable> output, Reporter
reporter) throws IOException {
                  // retrieve value like this
                  BytesWritable realValue = (BytesWritable)
values.next().get();
                  // generate the output Value, then wrap it
                  Text outValue = ...;
                  ouput.collect(outKey, new YourWritable(outValue));
               }
       }


you can also check out http://coderplay.javaeye.com/blog/259880,  this link
will show you a real example in every inch.


-- 
My research interests are distributed systems, parallel computing and
bytecode based virtual machine.

http://coderplay.javaeye.com

Re: A question about the combiner, reducer and the Output value class: can they be different?

Posted by Saptarshi Guha <sa...@gmail.com>.
On Nov 16, 2008, at 6:18 PM, Owen O'Malley wrote:

> On Sun, Nov 16, 2008 at 2:18 PM, Saptarshi Guha <saptarshi.guha@gmail.com 
> >wrote:
>
>> Hello,
>>       If my understanding is correct, the combiner will read in  
>> values for
>> a given key, process it, output it and then **all** values for a  
>> key are
>> given to the reducer.
>
>
> Not quite. The flow looks like RecordReader -> Mapper -> Combiner * ->
> Reducer -> OutputFormat .
>


Yes, i glossed over that bit. Thanks for the correction.

> The Combiner may be called 0, 1, or many times on each key between the
> mapper and reducer. Combiners are just an application specific  
> optimization
> that compress the intermediate output. They should not have side  
> effects or
> transform the types. Unfortunately, since there isn't a separate  
> interface
> for Combiners, there is isn't a great place to document this  
> requirement.
> I've just filed HADOOP-4668 to improve the documentation.
>

Hmm, i had no idea that the combiner could be called 0 times. Thanks  
for the heads up

Thank you
Saptarshi


Re: Large number of deletes takes out DataNodes

Posted by Raghu Angadi <ra...@yahoo-inc.com>.
This is a long known issue.. deleting files takes a lot of time and 
datanode does not heart beat during that time. Please file a jira so 
that the issue percolates up :)

There are more of these cases that result in datanode being marked dead.

As work around, you can double or triple heartbeat.recheck.interval 
(default 5000 millseconds) in config.

Raghu.

Jonathan Gray wrote:
> In many of my jobs I create intermediate data in HDFS.  I keep this around for a number of days for inspection until I delete it in large batches.
> 
> If I attempt to delete all of it at once, the flood of delete messages to the datanodes seems to cause starvation as they do not seem to responding to the namenode heartbeat.  There is about <5% cpu utilization and the logs just show the deletion of blocks.
> 
> In the worst case (if I'm deleting a few terabytes, about 50% of total capacity across 10 nodes) this causes the master to expire the datanode leases.  Once the datanode finishes deletions, it reports back to master and is added back to the cluster.  At this point, its blocks have already started to be reassigned so it then starts as an empty node.  In one run, this happened to 8 out of 10 nodes before getting back to a steady state.  There were a couple moments during that run that a number of the blocks had replication 1.
> 
> Obviously I can handle this by deleting less at any one time, but it seems like there might be something wrong.  With no CPU utilization, why does the datanode not respond to the namenode?
> 
> Thanks.
> 
> Jonathan Gray
> 


Large number of deletes takes out DataNodes

Posted by Jonathan Gray <jl...@streamy.com>.
In many of my jobs I create intermediate data in HDFS.  I keep this around for a number of days for inspection until I delete it in large batches.

If I attempt to delete all of it at once, the flood of delete messages to the datanodes seems to cause starvation as they do not seem to responding to the namenode heartbeat.  There is about <5% cpu utilization and the logs just show the deletion of blocks.

In the worst case (if I'm deleting a few terabytes, about 50% of total capacity across 10 nodes) this causes the master to expire the datanode leases.  Once the datanode finishes deletions, it reports back to master and is added back to the cluster.  At this point, its blocks have already started to be reassigned so it then starts as an empty node.  In one run, this happened to 8 out of 10 nodes before getting back to a steady state.  There were a couple moments during that run that a number of the blocks had replication 1.

Obviously I can handle this by deleting less at any one time, but it seems like there might be something wrong.  With no CPU utilization, why does the datanode not respond to the namenode?

Thanks.

Jonathan Gray


Re: A question about the combiner, reducer and the Output value class: can they be different?

Posted by Owen O'Malley <om...@apache.org>.
On Sun, Nov 16, 2008 at 2:18 PM, Saptarshi Guha <sa...@gmail.com>wrote:

> Hello,
>        If my understanding is correct, the combiner will read in values for
> a given key, process it, output it and then **all** values for a key are
> given to the reducer.


Not quite. The flow looks like RecordReader -> Mapper -> Combiner * ->
Reducer -> OutputFormat .

The Combiner may be called 0, 1, or many times on each key between the
mapper and reducer. Combiners are just an application specific optimization
that compress the intermediate output. They should not have side effects or
transform the types. Unfortunately, since there isn't a separate interface
for Combiners, there is isn't a great place to document this requirement.
I've just filed HADOOP-4668 to improve the documentation.


>   Then it ought to be possible for the combiner to be of the form
>      ... Reducer<IntWritable, Text, IntWritable, BytesWritable>
>    and the reducer:
>      ...Reducer<IntWritable, BytesWritable, IntWritable, Text>


Since the combiner may be called an arbitrary number of times, it must have
the same input and output types. So the parts generically look like:

input: InputFormat<K1,V1>
mapper: Mapper<K1,V1,K2,V2>
combiner: Reducer<K2,V2,K2,V2>
reducer: Reducer<K2,V2,K3,V3>
output: RecordWriter<K3,V3>

so you probably need to move the code that was changing the type into the
last setp of the mapper.

-- Owen