You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Christian Schneider <cs...@gmail.com> on 2013/03/13 19:36:18 UTC

Write huge values in Reduce Phase. "Hacked Outputformat" vs. "Direct write to HDFS" vs. ???

Hi together.
I'm not sure which approach to use. Currently I got two. Could you have a
look what's the best?

*Problem:*
As "value" of the Reduce phase I get a List with *a lot* of values (large
then the heap size).
For a legacy system I need to create a file like this:

key1 value1, value2, value3, .... valueN
key2 value1, value2, value3, .... valueN

N > 1.000.000

During my research and some other mails, I got this two solutions:


*Solutions:*
*a) "Hackekd Outputformat" *

As described [0] a solution could be to write a own Outputformat. First a
key is returned, - then only "null"

public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value); // it's possible to
call this N times...
    firstKey = false;
  }}

With this the Outputformat can recognize the "line change" to print a "\n".

The positive idea here is, that we follow the whole path. Map > ... >
Reduce > OutputFormat > HDFS


*b) "Write to HDFS in the reducer"*

As Harsh J mentioned here [1] it is possible to write to HDFS in the
Reducer Phase.
He gave also a link [2] from the Hadoop FAQ that says: it's "possible" to
do that.

With this information I implement this reducer:

public class UserToAppReducer extends Reducer<Text, Text, Text, Text> {
private static final int BUFFER_SIZE = 5 * 1024 * 1024;

private BufferedWriter br;

@Override
protected void setup(final Context context) throws IOException,
InterruptedException {
final FileSystem fs = FileSystem.get(context.getConfiguration());

final Path outputPath = FileOutputFormat.getOutputPath(context);

final String fileName = "reducer" + context.getTaskAttemptID().getId() +
"_" + context.getTaskAttemptID().getTaskID().getId() + "_" + new
Random(System.currentTimeMillis()).nextInt(10000);

this.br = new BufferedWriter(new OutputStreamWriter(fs.create(new
Path(outputPath, fileName))), BUFFER_SIZE);
}

@Override
protected void reduce(final Text appId, final Iterable<Text> userIds, final
Context context) throws IOException, InterruptedException {
this.br.append(appId.toString());
this.br.append('\t');

for (final Text text : userIds) {
this.br.append(text.toString());
this.br.append('\t');
}

this.br.append('\n');
}

@Override
protected void cleanup(final Context context) throws IOException,
InterruptedException {
this.br.close();
}
}

*Question:*
Both ways are running fine, but which approach should I take and are they
alternative ways?

Thanks a lot.

Best Regards,
Christian.

[0]
http://stackoverflow.com/questions/10140171/handling-large-output-values-from-reduce-step-in-hadoop
,
[1]
http://mail-archives.apache.org/mod_mbox/hadoop-user/201303.mbox/%3CCAOcnVr1r-VcoSe-YFBpNe3qmqvXSUT7z3NfABA0FzbNS_MmVgQ%40mail.gmail.com%3E
[2]
http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F


P.S.: Sorry for using html :)