You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Alejandro Abdelnur (JIRA)" <ji...@apache.org> on 2008/05/07 13:01:55 UTC

[jira] Issue Comment Edited: (HADOOP-3149) supporting multiple outputs for M/R jobs

    [ https://issues.apache.org/jira/browse/HADOOP-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12594853#action_12594853 ] 

tucu00 edited comment on HADOOP-3149 at 5/7/08 4:01 AM:
--------------------------------------------------------------------

Regarding Owen's _You should also remove the references to Writable and WritableComparable and make them Object instead._ comment.

Assuming that it refers to generified Objects, I still think it is not applicable to remove the use of WritableComparable and Writable from the {{MultipleOutputs.collect()}} .

The K and V classes uses in the different outputs of a MultipleOutputs may differ among the outputs and they may differ from the K and V classes used by the M/R. 

Because of this they can not be bound to the K and V generics of the Mapper and Reducer classes, thus they have to accept the base class.

For example:

{noformat}
  JobConf conf = new JobConf();

  conf.setMapperClass(MOMap.class);
  conf.setReducerClass(MOReduce.class);

  conf.setInputPath(inDir);
  conf.setInputFormat(TextInputFormat.class);

  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(Text.class);

  FileOutputFormat.setOutputPath(conf, outDir);

  conf.setOutputFormat(TextOutputFormat.class);
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(Text.class);

  MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, LongWritable.class, Text.class);
  MultipleOutputs.addNamedOutput(conf, "sequence", Text.class, MapWritable.class, Text.class);
  ...

public class MOReduce implements Reducer<LongWritable, Text, Text, Text> {
  private MultipleOutputs mos;

  public void configure(JobConf conf) {
    mos = new MultipleOutputs(conf);
  }

  public void reduce(LongWritable key, Iterator&lt;Text&gt; values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
    output.collect(new Text("a"), value);
    mos.collect("text", key, new Text("Hello"), reporter);
    mos.collect("sequence", value, new MapWritable(), reporter);
  }

  public void close() throws IOException {
    mos.close();
  }

}
{noformat}



      was (Author: tucu00):
    Regarding Owen's _You should also remove the references to Writable and WritableComparable and make them Object instead._ comment.

Assuming that it refers to generified Objects, I still think it is not applicable to remove the use of WritableComparable and Writable from the {{MultipleOutputs.collect()}} .

The K and V classes uses in the different outputs of a MultipleOutputs may differ among the outputs and they may differ from the K and V classes used by the M/R. 

Because of this they can not be bound to the K and V generics of the Mapper and Reducer classes, thus they have to accept the base class.

For example:

{{
  JobConf conf = new JobConf();

  conf.setMapperClass(MOMap.class);
  conf.setReducerClass(MOReduce.class);

  conf.setInputPath(inDir);
  conf.setInputFormat(TextInputFormat.class);

  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(Text.class);

  FileOutputFormat.setOutputPath(conf, outDir);

  conf.setOutputFormat(TextOutputFormat.class);
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(Text.class);

  MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, LongWritable.class, Text.class);
  MultipleOutputs.addNamedOutput(conf, "sequence", Text.class, MapWritable.class, Text.class);
  ...

public class MOReduce implements Reducer<LongWritable, Text, Text, Text> {
  private MultipleOutputs mos;

  public void configure(JobConf conf) {
    mos = new MultipleOutputs(conf);
  }

  public void reduce(LongWritable key, Iterator&lt;Text&gt; values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
    output.collect(new Text("a"), value);
    mos.collect("text", key, new Text("Hello"), reporter);
    mos.collect("sequence", value, new MapWritable(), reporter);
  }

  public void close() throws IOException {
    mos.close();
  }

}
}}


  
> supporting multiple outputs for M/R jobs
> ----------------------------------------
>
>                 Key: HADOOP-3149
>                 URL: https://issues.apache.org/jira/browse/HADOOP-3149
>             Project: Hadoop Core
>          Issue Type: New Feature
>          Components: mapred
>         Environment: all
>            Reporter: Alejandro Abdelnur
>            Assignee: Alejandro Abdelnur
>             Fix For: 0.18.0
>
>         Attachments: patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt
>
>
> The outputcollector supports writing data to a single output, the 'part' files in the output path.
> We found quite common that our M/R jobs have to write data to different output. For example when classifying data as NEW, UPDATE, DELETE, NO-CHANGE to later do different processing on it.
> Handling the initialization of additional outputs from within the M/R code complicates the code and is counter intuitive with the notion of job configuration.
> It would be desirable to:
> # Configure the additional outputs in the jobconf, potentially specifying different outputformats, key and value classes for each one.
> # Write to the additional outputs in a similar way as data is written to the outputcollector.
> # Support the speculative execution semantics for the output files, only visible in the final output for promoted tasks.
> To support multiple outputs the following classes would be added to mapred/lib:
> * {{MOJobConf}} : extends {{JobConf}} adding methods to define named outputs (name, outputformat, key class, value class)
> * {{MOOutputCollector}} : extends {{OutputCollector}} adding a {{collect(String outputName, WritableComparable key, Writable value)}} method.
> * {{MOMapper}} and {{MOReducer}}: implement {{Mapper}} and {{Reducer}} adding a new {{configure}}, {{map}} and {{reduce}} signature that take the corresponding {{MO}} classes and performs the proper initialization.
> The data flow behavior would be: key/values written to the default (unnamed) output (using the original OutputCollector {{collect}} signature) take part of the shuffle/sort/reduce processing phases. key/values written to a named output from within a map don't.
> The named output files would be named using the task type and task ID to avoid collision among tasks (i.e. 'new-m-00002' and 'new-r-00001').
> Together with the setInputPathFilter feature introduced by HADOOP-2055 it would become very easy to chain jobs working on particular named outputs within a single directory.
> We are using heavily this pattern and it greatly simplified our M/R code as well as chaining different M/R. 
> We wanted to contribute this back to Hadoop as we think is a generic feature many could benefit from.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.