You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Holger Stenzhorn <ho...@deri.org> on 2008/02/07 19:35:25 UTC

Improving performance for large values in reduce

Hello,

I am creating a small MapReduce application that works on large RDF 
dataset files in triple format (i.e. one RDF triple per line, "<subject> 
<predicate> <object>.").

In the mapper class I split up the triples into subject and object and 
then collect each subject/object as key plus the related complete triple 
as value (see [1]).

In the reducer class I now collect for each key again all collected 
values for the given key (i.e. subject/object) (see [2]):
The problem here is that the "concatenateValues(values)" method 
concatenates all values into one single string which then is collected 
for the given key.
This works fine for smaller thousands of triples but "gets stuck" in the 
reduce phase if I have e.g. more than some 300.000 triples to concatenate.

Does anybody have any solution on how this could be worked around?
...or just tell me if the way I am doing things here is plainly stupid?! ;-)

Thank you all very  much in advance!

Cheers,
Holger

[1]
...
  private static Pattern PATTERN =
    Pattern.compile("\\s*<(.+)>\\s*<(.+)>\\s*<(.+)>\\s*\\.");
...
  private static class TriplesMapper extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, Text> {

    public void map(LongWritable key, Text value,
      OutputCollector<Text, Text> output, Reporter reporter) throws 
IOException {
      String line = new String(value.toString());
      Matcher matcher = PATTERN.matcher(line);
      if (matcher.matches()) {
        String subject = matcher.group(1);
        String object = matcher.group(3);
        output.collect(new Text(subject), new Text(line));
        output.collect(new Text(object), new Text(line));
      }
    }
  }
...


[2]
  private static class TriplesFileReducer extends MapReduceBase
    implements Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterator<Text> values,
      OutputCollector<Text, Text> output, Reporter reporter) throws 
IOException {
      output.collect(key, new Text(concatenateValues(values)));
    }
  }

Re: Improving performance for large values in reduce

Posted by Holger Stenzhorn <ho...@gmail.com>.
Hi,

I am using a mini-cluster of three machines and on them experimented 
with severel different (sometimes strrange) reduce settings (from one 
single reduce per machine to 10 per machine).
...and the result is (basically) always the same, .i.e. the process gets 
stucked (or at least very slow) at ca. 98 % overall reduce.
In my latest try, this means that all tasks have finished execept for 
two, one at 91,41% the 77,77% (as you can see from the log file below).

But my guess, the problem is not (as suggested in another reply by 
Nathan Wang) the concatenateFunction() being called so many times on 
long strings?
(@Nathan: I am already using StringBuilder. I checked my logic and the 
duplication of substrings is indeed correct and needed.)
I am giving each child process 2GB - is this enough?

Cheers,
Holger

2008-02-08 14:25:17,717 INFO org.apache.hadoop.mapred.TaskRunner: 
task_200802081102_0001_r_000022_0 done; removing files.
2008-02-08 14:25:20,056 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:25:23,046 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.16666667% reduce > copy (1 of 2 at 
2.58 MB/s) >
2008-02-08 14:25:23,076 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:25:26,097 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:25:29,087 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.67874503% reduce > reduce
2008-02-08 14:25:29,116 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
...
2008-02-08 14:41:53,217 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.67874503% reduce > reduce
2008-02-08 14:41:53,437 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:41:56,238 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7016634% reduce > reduce
2008-02-08 14:41:56,456 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:41:59,256 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7513813% reduce > reduce
2008-02-08 14:41:59,476 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:42:02,276 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.75286764% reduce > reduce
2008-02-08 14:42:02,497 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:42:05,297 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7628957% reduce > reduce
2008-02-08 14:42:05,517 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:42:08,316 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7628957% reduce > reduce
2008-02-08 14:42:08,557 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
...
2008-02-08 14:43:50,999 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7628957% reduce > reduce
2008-02-08 14:43:51,257 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:43:54,018 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.76810163% reduce > reduce
2008-02-08 14:43:54,278 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:43:57,038 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.76810163% reduce > reduce
2008-02-08 14:43:57,298 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:44:00,058 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7700999% reduce > reduce
2008-02-08 14:44:00,318 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:44:03,078 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7700999% reduce > reduce
2008-02-08 14:44:03,338 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:44:06,098 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7712755% reduce > reduce
2008-02-08 14:44:06,358 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:44:09,118 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7721278% reduce > reduce
2008-02-08 14:44:09,378 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:44:12,139 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.7729801% reduce > reduce
2008-02-08 14:44:12,398 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:44:15,159 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.77449274% reduce > reduce
2008-02-08 14:44:15,418 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 14:44:18,179 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.77449274% reduce > reduce
...
2008-02-08 15:52:24,646 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000008_0 0.9141332% reduce > reduce
2008-02-08 15:52:26,957 INFO org.apache.hadoop.mapred.TaskTracker: 
task_200802081102_0001_r_000015_1 0.777745% reduce > reduce




Arun C Murthy wrote:
>
> On Feb 7, 2008, at 10:35 AM, Holger Stenzhorn wrote:
>
>> Hello,
>>
>> I am creating a small MapReduce application that works on large RDF 
>> dataset files in triple format (i.e. one RDF triple per line, 
>> "<subject> <predicate> <object>.").
>>
>> In the mapper class I split up the triples into subject and object 
>> and then collect each subject/object as key plus the related complete 
>> triple as value (see [1]).
>>
>> In the reducer class I now collect for each key again all collected 
>> values for the given key (i.e. subject/object) (see [2]):
>> The problem here is that the "concatenateValues(values)" method 
>> concatenates all values into one single string which then is 
>> collected for the given key.
>> This works fine for smaller thousands of triples but "gets stuck" in 
>> the reduce phase if I have e.g. more than some 300.000 triples to 
>> concatenate.
>>
>> Does anybody have any solution on how this could be worked around?
>> ...or just tell me if the way I am doing things here is plainly 
>> stupid?! ;-)
>>
>
> Question: How many reduces do you have?
>
> Arun
>> Thank you all very  much in advance!
>>
>> Cheers,
>> Holger
>>
>> [1]
>> ...
>>  private static Pattern PATTERN =
>>    Pattern.compile("\\s*<(.+)>\\s*<(.+)>\\s*<(.+)>\\s*\\.");
>> ...
>>  private static class TriplesMapper extends MapReduceBase
>>    implements Mapper<LongWritable, Text, Text, Text> {
>>
>>    public void map(LongWritable key, Text value,
>>      OutputCollector<Text, Text> output, Reporter reporter) throws 
>> IOException {
>>      String line = new String(value.toString());
>>      Matcher matcher = PATTERN.matcher(line);
>>      if (matcher.matches()) {
>>        String subject = matcher.group(1);
>>        String object = matcher.group(3);
>>        output.collect(new Text(subject), new Text(line));
>>        output.collect(new Text(object), new Text(line));
>>      }
>>    }
>>  }
>> ...
>>
>>
>> [2]
>>  private static class TriplesFileReducer extends MapReduceBase
>>    implements Reducer<Text, Text, Text, Text> {
>>
>>    public void reduce(Text key, Iterator<Text> values,
>>      OutputCollector<Text, Text> output, Reporter reporter) throws 
>> IOException {
>>      output.collect(key, new Text(concatenateValues(values)));
>>    }
>>  }
>

Re: Improving performance for large values in reduce

Posted by Arun C Murthy <ar...@yahoo-inc.com>.
On Feb 7, 2008, at 10:35 AM, Holger Stenzhorn wrote:

> Hello,
>
> I am creating a small MapReduce application that works on large RDF  
> dataset files in triple format (i.e. one RDF triple per line,  
> "<subject> <predicate> <object>.").
>
> In the mapper class I split up the triples into subject and object  
> and then collect each subject/object as key plus the related  
> complete triple as value (see [1]).
>
> In the reducer class I now collect for each key again all collected  
> values for the given key (i.e. subject/object) (see [2]):
> The problem here is that the "concatenateValues(values)" method  
> concatenates all values into one single string which then is  
> collected for the given key.
> This works fine for smaller thousands of triples but "gets stuck"  
> in the reduce phase if I have e.g. more than some 300.000 triples  
> to concatenate.
>
> Does anybody have any solution on how this could be worked around?
> ...or just tell me if the way I am doing things here is plainly  
> stupid?! ;-)
>

Question: How many reduces do you have?

Arun
> Thank you all very  much in advance!
>
> Cheers,
> Holger
>
> [1]
> ...
>  private static Pattern PATTERN =
>    Pattern.compile("\\s*<(.+)>\\s*<(.+)>\\s*<(.+)>\\s*\\.");
> ...
>  private static class TriplesMapper extends MapReduceBase
>    implements Mapper<LongWritable, Text, Text, Text> {
>
>    public void map(LongWritable key, Text value,
>      OutputCollector<Text, Text> output, Reporter reporter) throws  
> IOException {
>      String line = new String(value.toString());
>      Matcher matcher = PATTERN.matcher(line);
>      if (matcher.matches()) {
>        String subject = matcher.group(1);
>        String object = matcher.group(3);
>        output.collect(new Text(subject), new Text(line));
>        output.collect(new Text(object), new Text(line));
>      }
>    }
>  }
> ...
>
>
> [2]
>  private static class TriplesFileReducer extends MapReduceBase
>    implements Reducer<Text, Text, Text, Text> {
>
>    public void reduce(Text key, Iterator<Text> values,
>      OutputCollector<Text, Text> output, Reporter reporter) throws  
> IOException {
>      output.collect(key, new Text(concatenateValues(values)));
>    }
>  }