You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by "Tu, Tiankai" <Ti...@DEShawResearch.com> on 2009/03/29 00:10:19 UTC

Hadoop/HDFS for scientific simulation output data analysis

Hi,

I have been exploring the feasibility of using Hadoop/HDFS to analyze
terabyte-scale scientific simulation output datasets. After a set of
initial experiments, I have a number of questions regarding (1) the
configuration setting and (2) the IO read performance. 

------------------------------------------------------------------------
------------------------------------------------------------------------
--------------------------------------------------------------
Unlike typical Hadoop applications, post-simulation analysis usually
processes one file at a time. So I wrote a
WholeFileInputFormat/WholeFileRecordReader that reads an entire file
without parsing the content, as suggested by the Hadoop wiki FAQ. 

Specifying WholeFileInputFormat as as input file format
(conf.setInputFormat(FrameInputFormat.class), I constructed a simple
MapReduce program with the sole purpose to measure how fast Hadoop/HDFS
can read data. Here is the gist of the test program: 

- The map method does nothing, it returns immediately when called
- No reduce task (conf.setNumReduceTasks(0)
- JVM reused (conf.setNumTasksToExecutePerJvm(-1))

The detailed hardware/software configurations are listed below:

Hardware:
- 103 nodes, each with two 2.33GHz quad-core processors and 16 GB memory
- 1 GigE connection out of each node and connecting to a 1GigE switch in
the rack (3 racks in total)
- Each rack switch has 4 10-GigE connections to a backbone
full-bandwidth 10-GigE switch (second-tier switch)
- Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
node
- Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
after dropping linux vfs cache)

Software:
- 2.6.26-10smp kernel
- Hadoop 0.19.1
- Three nodes as namenode, secondary name node, and job tracker,
respectively
- Remaining 100 node as slaves, each running as both datanode and
tasktracker

Relevant hadoop-site.xml setting:
- dfs.namenode.handler.count = 100
- io.file.buffer.size = 67108864
- io.bytes.per.checksum = 67108864
- mapred.task.timeout = 1200000
- mapred.map.max.attempts = 8
- mapred.tasktracker.map.tasks.maximum = 8
- dfs.replication = 3
- toploogy.script.file.name set properly to a correct Python script

Dataset characteristics:

- Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2 MB,
respectively
- Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
256MB files, etc.)
- Datasets populated into HDFS via a parallel C MPI program (linked with
libhdfs.so) running on the 100 slave nodes
- dfs block size set to be the file size (otherwise, accessing a single
file may require network data transfer)

I launched the test MapReduce job one after another (so there was no
interference) and collected the following performance results:

Dataset name, Finished in,  Failed/Killed task attempts, HDFS bytes read
(Map=Total), Rack-local map tasks, Launched map tasks, data-local map
tasks

1GB file dataset, 16mins11sec, 0/382, (2,578,054,119,424), 98, 1982,
1873
256MB file dataset, 50min9sec,0/397, (7,754,295,017,472), 156, 6797,
6639
64MB file dataset,4hrs18mins21sec,394/251,(28,712,795,897,856), 153,
26245, 26068

The job for the 2MB file dataset failed to run due to the following
error:

09/03/27 21:39:58 INFO mapred.FileInputFormat: Total input paths to
process : 819200
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.Arrays.copyOf(Arrays.java:2786)
        at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:71)
        at java.io.DataOutputStream.writeByte(DataOutputStream.java:136)
        at org.apache.hadoop.io.UTF8.writeChars(UTF8.java:274)

After running into this error, the job tracker no longer accepted jobs.
I stopped and restarted the job tracker with a larger heap size setup
(8GB). But it still didn't accept new jobs. 

------------------------------------------------------------------------
------------------------------------------------------------------------
--------------------------------------------------------------
Questions:

(1) Why reading 1GB files is signfiicantly faster than reading smaller
file sizes, even though reading a 256MB file is as much a bulk transfer?

(2)  Why are the reported HDFS bytes read signfiicantly higher than the
dataset size (1.6TB)? (The percentage of failed/killed tasks was much
lower than the extra bytes read.)

(3) What is the maximum number (roughly) of input paths the job tracker
can handle? (For scientific simulation output dataset, it is quite
commonplace to have hundreds of thousands to millions of files.) 

(4) Even for the 1GB file dataset, considering the percentage of
data-local map tasks (94.5%), the overall end-to-end read bandwidth
(1.69GB/s) is much lower than the potential performance offered by the
hardware (200MB/s local RAID0 read performance, multiplied by 100 slave
nodes). What are the settings I should change (either in the test
MapReduce program or in the config files) to obtain better performance? 

Thank you very much.

Tiankai




RE: Hadoop/HDFS for scientific simulation output data analysis

Posted by "Tu, Tiankai" <Ti...@DEShawResearch.com>.
Thanks for the heads-up, Owen. Do you know how long it took to run the
application? And how many files were processed? I am particularly eager
to know the answer to the second question.

I found an article at http://developer.yahoo.net/blogs/hadoop/2008/09/,
where the total number of cores used was over 30,000. The number of
files in that benchmark run was 14,000. The reported average throughput
for read was 18MB/s on 500 nodes and 66MB/s on 4000 nodes. It was
explained in the article (underneath Table 1) that:

	"The 4000-node cluster throughput was 7 times better than 500's
for writes and 3.6 times better for reads even though the bigger cluster
carried more (4 v/s 2 tasks) per node load than the smaller one."

Is 66MB/s the aggregated read throughput or per-node throughput? If the
latter were the case, the aggregated bandwidth would have been 4000 x
66MB/s = 264 GB/s, and the speedup on 4000 nodes over 500 nodes should
have been (66/18) * (4000/500) = 28.8. 


-----Original Message-----
From: Owen O'Malley [mailto:omalley@apache.org] 
Sent: Friday, April 03, 2009 5:20 PM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop/HDFS for scientific simulation output data analysis

On Apr 3, 2009, at 1:41 PM, Tu, Tiankai wrote:

> By the way, what is the largest size---in terms of total bytes, number
> of files, and number of nodes---in your applications? Thanks.

The largest Hadoop application that has been documented is the Yahoo  
Webmap.

10,000 cores
500 TB shuffle
300 TB compressed final output

http://developer.yahoo.net/blogs/hadoop/2008/02/yahoo-worlds-largest-pro
duction-hadoop.html

-- Owen

Re: Hadoop/HDFS for scientific simulation output data analysis

Posted by Owen O'Malley <om...@apache.org>.
On Apr 3, 2009, at 1:41 PM, Tu, Tiankai wrote:

> By the way, what is the largest size---in terms of total bytes, number
> of files, and number of nodes---in your applications? Thanks.

The largest Hadoop application that has been documented is the Yahoo  
Webmap.

10,000 cores
500 TB shuffle
300 TB compressed final output

http://developer.yahoo.net/blogs/hadoop/2008/02/yahoo-worlds-largest-production-hadoop.html

-- Owen

RE: Hadoop/HDFS for scientific simulation output data analysis

Posted by "Tu, Tiankai" <Ti...@DEShawResearch.com>.
Thanks for the update and suggestion, Matei. 

I can certainly construct an input text file containing all the files of
a dataset
(http://hadoop.apache.org/core/docs/r0.19.1/streaming.html#How+do+I+proc
ess+files%2C+one+per+map%3F), then let the jobtracker dispatch the file
names to the maps, and open the files directly from within the map
method. But the jobtracker merely treats the file names as text input
and does not make an effort to assign a file (name) to the nodes that
store the file. As a result, a node opening a file is almost certain to
request data from a different data node---which destroys IO locality
(the very strength of Hadoop) and results in worse performance. (I had
verified such behavior earlier using Hadoop streaming.)

By the way, what is the largest size---in terms of total bytes, number
of files, and number of nodes---in your applications? Thanks.


-----Original Message-----
From: Matei Zaharia [mailto:matei@cloudera.com] 
Sent: Friday, April 03, 2009 1:18 PM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop/HDFS for scientific simulation output data analysis

Hadoop does checksums for each small chunk of the file (512 bytes by
default) and stores a list of checksums for each chunk with the file,
rather
than storing just one checksum at the end. This makes it easier to read
only
a part of a file and know that it's not corrupt without having to read
and
checksum the whole file. It also lets you use smaller / simpler
checksums
for each chunk, making them more efficient to compute than the larger
checksum that would be needed to provide the same level of safety for
the
whole file.

The default buffer size is confusingly not 64 KB, it's 4 KB. It really
is
bad for performance as you saw. But I'd recommend trying 64 or 128 KB
before
jumping to 64 MB. 128K is also the setting Yahoo used in its 2000-node
performance tests (see http://wiki.apache.org/hadoop/FAQ).

The reason big buffers may impair cache locality is that CPU caches are
typically a few MB. If you set your checksum size and buffer size to 64
MB,
then whenever you read a block, the CPU first has to checksum 64 MB
worth of
data then start again at the beginning to read it and pass it through
your
application. During the checksumming process, the first pages of data
fell
out of CPU cache as you checksummed the later ones. Therefore, you have
to
read them from memory again during the second scan. If you just had a 64
KB
block, it would stay in cache since the first time you read it. The same
issue happens if instead of checksumming you were copying from one
buffer to
another (which does happen at a few places in Hadoop, and they tend to
use
io.file.buffer.size). So while I haven't tried measuring performance
with 64
MB vs 128 KB, I wouldn't be surprised if it leads to bad behavior,
because
it's much higher than what anyone runs in production.

Finally, if you just want to sequentially process a file on each node
and
you only want one logical "input record" per map, it might be better not
to
use an input format that reads the record into memory at all. Instead,
you
can have the map directly open the file, and have your InputFormat just
locate the map on the right node. This avoids copying the whole file
into
memory before streaming it through your mapper. If your algorithm does
require random access throughout the file on the other hand, you do need
to
read it all in. I think the WholeFileRecordReader in the FAQ is aimed at
smaller files than 256 MB / 1 GB.

On Fri, Apr 3, 2009 at 9:37 AM, Tu, Tiankai
<Ti...@deshawresearch.com>wrote:

> Thanks for the comments, Matei.
>
> The machines I ran the experiments have 16 GB memory each. I don't see
> how 64 MB buffer could be huge or is bad for memory consumption. In
> fact, I set it to much larger value after initial rounds of tests
showed
> abysmal results using the default 64 KB buffer. Also, why is it
> necessary to compute checksum for every 512 bytes why only an
end-to-end
> (whole file) checksum makes sense? I set it to a much larger value to
> avoid the overhead.
>
> I didn't quite understand what you meant by bad for cache locality.
The
> jobs were IO bound in the first place. Any cache effect came
second---at
> least an order of magnitude negligible. Can you clarify which
particular
> computation (maybe within Hadoop) that was made slow because of a
large
> io buffer size?
>
> What bothered you was exactly what bothered me and prompted me to ask
> the question why the job tracker reported much more bytes read by the
> map task. I can confirm that the experiments were set up correctly. In
> fact, the numbers of map tasks were correctly reported by the job
> tracker. There were 1600 for the 1GB file dataset, 6400 for the 256MB
> file dataset, and so forth.
>
> Tiankai
>
>
>
> -----Original Message-----
> From: Matei Zaharia [mailto:matei@cloudera.com]
> Sent: Friday, April 03, 2009 11:21 AM
> To: core-user@hadoop.apache.org
> Subject: Re: Hadoop/HDFS for scientific simulation output data
analysis
>
> Hi Tiankai,
>
> The one strange thing I see in your configuration as described is IO
> buffer
> size and IO bytes per checksum set to 64 MB. This is much higher than
> the
> recommended defaults, which are about 64 KB for buffer size and 1 KB
or
> 512
> bytes for checksum. (Actually I haven't seen anyone change checksum
from
> its
> default of 512 bytes). Having huge buffers is bad for memory
consumption
> and
> cache locality.
>
> The other thing that bothers me is that on your 64 MB data set, you
have
> 28
> TB of HDFS bytes read. This is off from number of map tasks * bytes
per
> map
> by an order of magnitude. Are you sure that you've generated the data
> set
> correctly and that it's the only input path given to your job? Does
> bin/hadoop dfs -dus <path to dataset> come out as 1.6 TB?
>
> Matei
>
> On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
> <Ti...@deshawresearch.com>wrote:
>
> > Hi,
> >
> > I have been exploring the feasibility of using Hadoop/HDFS to
analyze
> > terabyte-scale scientific simulation output datasets. After a set of
> > initial experiments, I have a number of questions regarding (1) the
> > configuration setting and (2) the IO read performance.
> >
> >
>
------------------------------------------------------------------------
> >
>
------------------------------------------------------------------------
> > --------------------------------------------------------------
> > Unlike typical Hadoop applications, post-simulation analysis usually
> > processes one file at a time. So I wrote a
> > WholeFileInputFormat/WholeFileRecordReader that reads an entire file
> > without parsing the content, as suggested by the Hadoop wiki FAQ.
> >
> > Specifying WholeFileInputFormat as as input file format
> > (conf.setInputFormat(FrameInputFormat.class), I constructed a simple
> > MapReduce program with the sole purpose to measure how fast
> Hadoop/HDFS
> > can read data. Here is the gist of the test program:
> >
> > - The map method does nothing, it returns immediately when called
> > - No reduce task (conf.setNumReduceTasks(0)
> > - JVM reused (conf.setNumTasksToExecutePerJvm(-1))
> >
> > The detailed hardware/software configurations are listed below:
> >
> > Hardware:
> > - 103 nodes, each with two 2.33GHz quad-core processors and 16 GB
> memory
> > - 1 GigE connection out of each node and connecting to a 1GigE
switch
> in
> > the rack (3 racks in total)
> > - Each rack switch has 4 10-GigE connections to a backbone
> > full-bandwidth 10-GigE switch (second-tier switch)
> > - Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
> > node
> > - Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
> > after dropping linux vfs cache)
> >
> > Software:
> > - 2.6.26-10smp kernel
> > - Hadoop 0.19.1
> > - Three nodes as namenode, secondary name node, and job tracker,
> > respectively
> > - Remaining 100 node as slaves, each running as both datanode and
> > tasktracker
> >
> > Relevant hadoop-site.xml setting:
> > - dfs.namenode.handler.count = 100
> > - io.file.buffer.size = 67108864
> > - io.bytes.per.checksum = 67108864
> > - mapred.task.timeout = 1200000
> > - mapred.map.max.attempts = 8
> > - mapred.tasktracker.map.tasks.maximum = 8
> > - dfs.replication = 3
> > - toploogy.script.file.name set properly to a correct Python script
> >
> > Dataset characteristics:
> >
> > - Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2
MB,
> > respectively
> > - Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
> > 256MB files, etc.)
> > - Datasets populated into HDFS via a parallel C MPI program (linked
> with
> > libhdfs.so) running on the 100 slave nodes
> > - dfs block size set to be the file size (otherwise, accessing a
> single
> > file may require network data transfer)
> >
> > I launched the test MapReduce job one after another (so there was no
> > interference) and collected the following performance results:
> >
> > Dataset name, Finished in,  Failed/Killed task attempts, HDFS bytes
> read
> > (Map=Total), Rack-local map tasks, Launched map tasks, data-local
map
> > tasks
> >
> > 1GB file dataset, 16mins11sec, 0/382, (2,578,054,119,424), 98, 1982,
> > 1873
> > 256MB file dataset, 50min9sec,0/397, (7,754,295,017,472), 156, 6797,
> > 6639
> > 64MB file dataset,4hrs18mins21sec,394/251,(28,712,795,897,856), 153,
> > 26245, 26068
> >
> > The job for the 2MB file dataset failed to run due to the following
> > error:
> >
> > 09/03/27 21:39:58 INFO mapred.FileInputFormat: Total input paths to
> > process : 819200
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> >        at java.util.Arrays.copyOf(Arrays.java:2786)
> >        at
> > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:71)
> >        at
> java.io.DataOutputStream.writeByte(DataOutputStream.java:136)
> >        at org.apache.hadoop.io.UTF8.writeChars(UTF8.java:274)
> >
> > After running into this error, the job tracker no longer accepted
> jobs.
> > I stopped and restarted the job tracker with a larger heap size
setup
> > (8GB). But it still didn't accept new jobs.
> >
> >
>
------------------------------------------------------------------------
> >
>
------------------------------------------------------------------------
> > --------------------------------------------------------------
> > Questions:
> >
> > (1) Why reading 1GB files is signfiicantly faster than reading
smaller
> > file sizes, even though reading a 256MB file is as much a bulk
> transfer?
> >
> > (2)  Why are the reported HDFS bytes read signfiicantly higher than
> the
> > dataset size (1.6TB)? (The percentage of failed/killed tasks was
much
> > lower than the extra bytes read.)
> >
> > (3) What is the maximum number (roughly) of input paths the job
> tracker
> > can handle? (For scientific simulation output dataset, it is quite
> > commonplace to have hundreds of thousands to millions of files.)
> >
> > (4) Even for the 1GB file dataset, considering the percentage of
> > data-local map tasks (94.5%), the overall end-to-end read bandwidth
> > (1.69GB/s) is much lower than the potential performance offered by
the
> > hardware (200MB/s local RAID0 read performance, multiplied by 100
> slave
> > nodes). What are the settings I should change (either in the test
> > MapReduce program or in the config files) to obtain better
> performance?
> >
> > Thank you very much.
> >
> > Tiankai
> >
> >
> >
> >
>

Re: Hadoop/HDFS for scientific simulation output data analysis

Posted by Matei Zaharia <ma...@cloudera.com>.
Hadoop does checksums for each small chunk of the file (512 bytes by
default) and stores a list of checksums for each chunk with the file, rather
than storing just one checksum at the end. This makes it easier to read only
a part of a file and know that it's not corrupt without having to read and
checksum the whole file. It also lets you use smaller / simpler checksums
for each chunk, making them more efficient to compute than the larger
checksum that would be needed to provide the same level of safety for the
whole file.

The default buffer size is confusingly not 64 KB, it's 4 KB. It really is
bad for performance as you saw. But I'd recommend trying 64 or 128 KB before
jumping to 64 MB. 128K is also the setting Yahoo used in its 2000-node
performance tests (see http://wiki.apache.org/hadoop/FAQ).

The reason big buffers may impair cache locality is that CPU caches are
typically a few MB. If you set your checksum size and buffer size to 64 MB,
then whenever you read a block, the CPU first has to checksum 64 MB worth of
data then start again at the beginning to read it and pass it through your
application. During the checksumming process, the first pages of data fell
out of CPU cache as you checksummed the later ones. Therefore, you have to
read them from memory again during the second scan. If you just had a 64 KB
block, it would stay in cache since the first time you read it. The same
issue happens if instead of checksumming you were copying from one buffer to
another (which does happen at a few places in Hadoop, and they tend to use
io.file.buffer.size). So while I haven't tried measuring performance with 64
MB vs 128 KB, I wouldn't be surprised if it leads to bad behavior, because
it's much higher than what anyone runs in production.

Finally, if you just want to sequentially process a file on each node and
you only want one logical "input record" per map, it might be better not to
use an input format that reads the record into memory at all. Instead, you
can have the map directly open the file, and have your InputFormat just
locate the map on the right node. This avoids copying the whole file into
memory before streaming it through your mapper. If your algorithm does
require random access throughout the file on the other hand, you do need to
read it all in. I think the WholeFileRecordReader in the FAQ is aimed at
smaller files than 256 MB / 1 GB.

On Fri, Apr 3, 2009 at 9:37 AM, Tu, Tiankai
<Ti...@deshawresearch.com>wrote:

> Thanks for the comments, Matei.
>
> The machines I ran the experiments have 16 GB memory each. I don't see
> how 64 MB buffer could be huge or is bad for memory consumption. In
> fact, I set it to much larger value after initial rounds of tests showed
> abysmal results using the default 64 KB buffer. Also, why is it
> necessary to compute checksum for every 512 bytes why only an end-to-end
> (whole file) checksum makes sense? I set it to a much larger value to
> avoid the overhead.
>
> I didn't quite understand what you meant by bad for cache locality. The
> jobs were IO bound in the first place. Any cache effect came second---at
> least an order of magnitude negligible. Can you clarify which particular
> computation (maybe within Hadoop) that was made slow because of a large
> io buffer size?
>
> What bothered you was exactly what bothered me and prompted me to ask
> the question why the job tracker reported much more bytes read by the
> map task. I can confirm that the experiments were set up correctly. In
> fact, the numbers of map tasks were correctly reported by the job
> tracker. There were 1600 for the 1GB file dataset, 6400 for the 256MB
> file dataset, and so forth.
>
> Tiankai
>
>
>
> -----Original Message-----
> From: Matei Zaharia [mailto:matei@cloudera.com]
> Sent: Friday, April 03, 2009 11:21 AM
> To: core-user@hadoop.apache.org
> Subject: Re: Hadoop/HDFS for scientific simulation output data analysis
>
> Hi Tiankai,
>
> The one strange thing I see in your configuration as described is IO
> buffer
> size and IO bytes per checksum set to 64 MB. This is much higher than
> the
> recommended defaults, which are about 64 KB for buffer size and 1 KB or
> 512
> bytes for checksum. (Actually I haven't seen anyone change checksum from
> its
> default of 512 bytes). Having huge buffers is bad for memory consumption
> and
> cache locality.
>
> The other thing that bothers me is that on your 64 MB data set, you have
> 28
> TB of HDFS bytes read. This is off from number of map tasks * bytes per
> map
> by an order of magnitude. Are you sure that you've generated the data
> set
> correctly and that it's the only input path given to your job? Does
> bin/hadoop dfs -dus <path to dataset> come out as 1.6 TB?
>
> Matei
>
> On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
> <Ti...@deshawresearch.com>wrote:
>
> > Hi,
> >
> > I have been exploring the feasibility of using Hadoop/HDFS to analyze
> > terabyte-scale scientific simulation output datasets. After a set of
> > initial experiments, I have a number of questions regarding (1) the
> > configuration setting and (2) the IO read performance.
> >
> >
> ------------------------------------------------------------------------
> >
> ------------------------------------------------------------------------
> > --------------------------------------------------------------
> > Unlike typical Hadoop applications, post-simulation analysis usually
> > processes one file at a time. So I wrote a
> > WholeFileInputFormat/WholeFileRecordReader that reads an entire file
> > without parsing the content, as suggested by the Hadoop wiki FAQ.
> >
> > Specifying WholeFileInputFormat as as input file format
> > (conf.setInputFormat(FrameInputFormat.class), I constructed a simple
> > MapReduce program with the sole purpose to measure how fast
> Hadoop/HDFS
> > can read data. Here is the gist of the test program:
> >
> > - The map method does nothing, it returns immediately when called
> > - No reduce task (conf.setNumReduceTasks(0)
> > - JVM reused (conf.setNumTasksToExecutePerJvm(-1))
> >
> > The detailed hardware/software configurations are listed below:
> >
> > Hardware:
> > - 103 nodes, each with two 2.33GHz quad-core processors and 16 GB
> memory
> > - 1 GigE connection out of each node and connecting to a 1GigE switch
> in
> > the rack (3 racks in total)
> > - Each rack switch has 4 10-GigE connections to a backbone
> > full-bandwidth 10-GigE switch (second-tier switch)
> > - Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
> > node
> > - Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
> > after dropping linux vfs cache)
> >
> > Software:
> > - 2.6.26-10smp kernel
> > - Hadoop 0.19.1
> > - Three nodes as namenode, secondary name node, and job tracker,
> > respectively
> > - Remaining 100 node as slaves, each running as both datanode and
> > tasktracker
> >
> > Relevant hadoop-site.xml setting:
> > - dfs.namenode.handler.count = 100
> > - io.file.buffer.size = 67108864
> > - io.bytes.per.checksum = 67108864
> > - mapred.task.timeout = 1200000
> > - mapred.map.max.attempts = 8
> > - mapred.tasktracker.map.tasks.maximum = 8
> > - dfs.replication = 3
> > - toploogy.script.file.name set properly to a correct Python script
> >
> > Dataset characteristics:
> >
> > - Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2 MB,
> > respectively
> > - Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
> > 256MB files, etc.)
> > - Datasets populated into HDFS via a parallel C MPI program (linked
> with
> > libhdfs.so) running on the 100 slave nodes
> > - dfs block size set to be the file size (otherwise, accessing a
> single
> > file may require network data transfer)
> >
> > I launched the test MapReduce job one after another (so there was no
> > interference) and collected the following performance results:
> >
> > Dataset name, Finished in,  Failed/Killed task attempts, HDFS bytes
> read
> > (Map=Total), Rack-local map tasks, Launched map tasks, data-local map
> > tasks
> >
> > 1GB file dataset, 16mins11sec, 0/382, (2,578,054,119,424), 98, 1982,
> > 1873
> > 256MB file dataset, 50min9sec,0/397, (7,754,295,017,472), 156, 6797,
> > 6639
> > 64MB file dataset,4hrs18mins21sec,394/251,(28,712,795,897,856), 153,
> > 26245, 26068
> >
> > The job for the 2MB file dataset failed to run due to the following
> > error:
> >
> > 09/03/27 21:39:58 INFO mapred.FileInputFormat: Total input paths to
> > process : 819200
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> >        at java.util.Arrays.copyOf(Arrays.java:2786)
> >        at
> > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:71)
> >        at
> java.io.DataOutputStream.writeByte(DataOutputStream.java:136)
> >        at org.apache.hadoop.io.UTF8.writeChars(UTF8.java:274)
> >
> > After running into this error, the job tracker no longer accepted
> jobs.
> > I stopped and restarted the job tracker with a larger heap size setup
> > (8GB). But it still didn't accept new jobs.
> >
> >
> ------------------------------------------------------------------------
> >
> ------------------------------------------------------------------------
> > --------------------------------------------------------------
> > Questions:
> >
> > (1) Why reading 1GB files is signfiicantly faster than reading smaller
> > file sizes, even though reading a 256MB file is as much a bulk
> transfer?
> >
> > (2)  Why are the reported HDFS bytes read signfiicantly higher than
> the
> > dataset size (1.6TB)? (The percentage of failed/killed tasks was much
> > lower than the extra bytes read.)
> >
> > (3) What is the maximum number (roughly) of input paths the job
> tracker
> > can handle? (For scientific simulation output dataset, it is quite
> > commonplace to have hundreds of thousands to millions of files.)
> >
> > (4) Even for the 1GB file dataset, considering the percentage of
> > data-local map tasks (94.5%), the overall end-to-end read bandwidth
> > (1.69GB/s) is much lower than the potential performance offered by the
> > hardware (200MB/s local RAID0 read performance, multiplied by 100
> slave
> > nodes). What are the settings I should change (either in the test
> > MapReduce program or in the config files) to obtain better
> performance?
> >
> > Thank you very much.
> >
> > Tiankai
> >
> >
> >
> >
>

RE: Hadoop/HDFS for scientific simulation output data analysis

Posted by "Tu, Tiankai" <Ti...@DEShawResearch.com>.
Thanks for the comments, Matei.

The machines I ran the experiments have 16 GB memory each. I don't see
how 64 MB buffer could be huge or is bad for memory consumption. In
fact, I set it to much larger value after initial rounds of tests showed
abysmal results using the default 64 KB buffer. Also, why is it
necessary to compute checksum for every 512 bytes why only an end-to-end
(whole file) checksum makes sense? I set it to a much larger value to
avoid the overhead. 

I didn't quite understand what you meant by bad for cache locality. The
jobs were IO bound in the first place. Any cache effect came second---at
least an order of magnitude negligible. Can you clarify which particular
computation (maybe within Hadoop) that was made slow because of a large
io buffer size?

What bothered you was exactly what bothered me and prompted me to ask
the question why the job tracker reported much more bytes read by the
map task. I can confirm that the experiments were set up correctly. In
fact, the numbers of map tasks were correctly reported by the job
tracker. There were 1600 for the 1GB file dataset, 6400 for the 256MB
file dataset, and so forth. 

Tiankai

 

-----Original Message-----
From: Matei Zaharia [mailto:matei@cloudera.com] 
Sent: Friday, April 03, 2009 11:21 AM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop/HDFS for scientific simulation output data analysis

Hi Tiankai,

The one strange thing I see in your configuration as described is IO
buffer
size and IO bytes per checksum set to 64 MB. This is much higher than
the
recommended defaults, which are about 64 KB for buffer size and 1 KB or
512
bytes for checksum. (Actually I haven't seen anyone change checksum from
its
default of 512 bytes). Having huge buffers is bad for memory consumption
and
cache locality.

The other thing that bothers me is that on your 64 MB data set, you have
28
TB of HDFS bytes read. This is off from number of map tasks * bytes per
map
by an order of magnitude. Are you sure that you've generated the data
set
correctly and that it's the only input path given to your job? Does
bin/hadoop dfs -dus <path to dataset> come out as 1.6 TB?

Matei

On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
<Ti...@deshawresearch.com>wrote:

> Hi,
>
> I have been exploring the feasibility of using Hadoop/HDFS to analyze
> terabyte-scale scientific simulation output datasets. After a set of
> initial experiments, I have a number of questions regarding (1) the
> configuration setting and (2) the IO read performance.
>
>
------------------------------------------------------------------------
>
------------------------------------------------------------------------
> --------------------------------------------------------------
> Unlike typical Hadoop applications, post-simulation analysis usually
> processes one file at a time. So I wrote a
> WholeFileInputFormat/WholeFileRecordReader that reads an entire file
> without parsing the content, as suggested by the Hadoop wiki FAQ.
>
> Specifying WholeFileInputFormat as as input file format
> (conf.setInputFormat(FrameInputFormat.class), I constructed a simple
> MapReduce program with the sole purpose to measure how fast
Hadoop/HDFS
> can read data. Here is the gist of the test program:
>
> - The map method does nothing, it returns immediately when called
> - No reduce task (conf.setNumReduceTasks(0)
> - JVM reused (conf.setNumTasksToExecutePerJvm(-1))
>
> The detailed hardware/software configurations are listed below:
>
> Hardware:
> - 103 nodes, each with two 2.33GHz quad-core processors and 16 GB
memory
> - 1 GigE connection out of each node and connecting to a 1GigE switch
in
> the rack (3 racks in total)
> - Each rack switch has 4 10-GigE connections to a backbone
> full-bandwidth 10-GigE switch (second-tier switch)
> - Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
> node
> - Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
> after dropping linux vfs cache)
>
> Software:
> - 2.6.26-10smp kernel
> - Hadoop 0.19.1
> - Three nodes as namenode, secondary name node, and job tracker,
> respectively
> - Remaining 100 node as slaves, each running as both datanode and
> tasktracker
>
> Relevant hadoop-site.xml setting:
> - dfs.namenode.handler.count = 100
> - io.file.buffer.size = 67108864
> - io.bytes.per.checksum = 67108864
> - mapred.task.timeout = 1200000
> - mapred.map.max.attempts = 8
> - mapred.tasktracker.map.tasks.maximum = 8
> - dfs.replication = 3
> - toploogy.script.file.name set properly to a correct Python script
>
> Dataset characteristics:
>
> - Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2 MB,
> respectively
> - Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
> 256MB files, etc.)
> - Datasets populated into HDFS via a parallel C MPI program (linked
with
> libhdfs.so) running on the 100 slave nodes
> - dfs block size set to be the file size (otherwise, accessing a
single
> file may require network data transfer)
>
> I launched the test MapReduce job one after another (so there was no
> interference) and collected the following performance results:
>
> Dataset name, Finished in,  Failed/Killed task attempts, HDFS bytes
read
> (Map=Total), Rack-local map tasks, Launched map tasks, data-local map
> tasks
>
> 1GB file dataset, 16mins11sec, 0/382, (2,578,054,119,424), 98, 1982,
> 1873
> 256MB file dataset, 50min9sec,0/397, (7,754,295,017,472), 156, 6797,
> 6639
> 64MB file dataset,4hrs18mins21sec,394/251,(28,712,795,897,856), 153,
> 26245, 26068
>
> The job for the 2MB file dataset failed to run due to the following
> error:
>
> 09/03/27 21:39:58 INFO mapred.FileInputFormat: Total input paths to
> process : 819200
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>        at java.util.Arrays.copyOf(Arrays.java:2786)
>        at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:71)
>        at
java.io.DataOutputStream.writeByte(DataOutputStream.java:136)
>        at org.apache.hadoop.io.UTF8.writeChars(UTF8.java:274)
>
> After running into this error, the job tracker no longer accepted
jobs.
> I stopped and restarted the job tracker with a larger heap size setup
> (8GB). But it still didn't accept new jobs.
>
>
------------------------------------------------------------------------
>
------------------------------------------------------------------------
> --------------------------------------------------------------
> Questions:
>
> (1) Why reading 1GB files is signfiicantly faster than reading smaller
> file sizes, even though reading a 256MB file is as much a bulk
transfer?
>
> (2)  Why are the reported HDFS bytes read signfiicantly higher than
the
> dataset size (1.6TB)? (The percentage of failed/killed tasks was much
> lower than the extra bytes read.)
>
> (3) What is the maximum number (roughly) of input paths the job
tracker
> can handle? (For scientific simulation output dataset, it is quite
> commonplace to have hundreds of thousands to millions of files.)
>
> (4) Even for the 1GB file dataset, considering the percentage of
> data-local map tasks (94.5%), the overall end-to-end read bandwidth
> (1.69GB/s) is much lower than the potential performance offered by the
> hardware (200MB/s local RAID0 read performance, multiplied by 100
slave
> nodes). What are the settings I should change (either in the test
> MapReduce program or in the config files) to obtain better
performance?
>
> Thank you very much.
>
> Tiankai
>
>
>
>

Hadoop Reduce Job errors, job gets killed.

Posted by Usman Waheed <us...@opera.com>.
Hi,

My Hadoop Map/Reduce is giving the following error message right about 
when it is 95% complete with the reducing step on one node. The process 
gets killed. The error message from the logs are noted below.
*java.io.IOException: Filesystem closed*, any ideas please?

2009-04-06 10:41:07,202 INFO org.apache.hadoop.streaming.PipeMapRed: 
Records R/W=10263370/642860
2009-04-06 10:41:17,203 INFO org.apache.hadoop.streaming.PipeMapRed: 
Records R/W=10263370/1033247
2009-04-06 10:41:27,437 INFO org.apache.hadoop.streaming.PipeMapRed: 
Records R/W=10263370/1844222
2009-04-06 10:41:37,438 INFO org.apache.hadoop.streaming.PipeMapRed: 
Records R/W=10263370/2884839
2009-04-06 10:41:44,350 WARN org.apache.hadoop.streaming.PipeMapRed: 
java.io.IOException: Filesystem closed
    at org.apache.hadoop.dfs.DFSClient.checkOpen(DFSClient.java:166)
    at org.apache.hadoop.dfs.DFSClient.access$500(DFSClient.java:58)
    at 
org.apache.hadoop.dfs.DFSClient$DFSOutputStream.writeChunk(DFSClient.java:2104)
    at 
org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunk(FSOutputSummer.java:141)
    at 
org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:124)
    at org.apache.hadoop.fs.FSOutputSummer.write1(FSOutputSummer.java:112)
    at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:86)
    at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:41)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at 
org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:72)
    at 
org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:87)
    at org.apache.hadoop.mapred.ReduceTask$2.collect(ReduceTask.java:315)
    at 
org.apache.hadoop.streaming.PipeMapRed$MROutputThread.run(PipeMapRed.java:346)

2009-04-06 10:41:44,478 INFO org.apache.hadoop.streaming.PipeMapRed: 
MRErrorThread done
2009-04-06 10:41:44,478 INFO org.apache.hadoop.streaming.PipeMapRed: 
PipeMapRed.waitOutputThreads(): subprocess failed with code 141 in 
org.apache.hadoop.streaming.PipeMapRed
2009-04-06 10:41:44,480 INFO org.apache.hadoop.streaming.PipeMapRed: 
mapRedFinished
2009-04-06 10:41:44,480 INFO org.apache.hadoop.streaming.PipeMapRed: 
PipeMapRed.waitOutputThreads(): subprocess failed with code 141 in 
org.apache.hadoop.streaming.PipeMapRed
2009-04-06 10:41:44,481 WARN org.apache.hadoop.mapred.TaskTracker: Error 
running child
java.io.IOException: Filesystem closed
    at org.apache.hadoop.dfs.DFSClient.checkOpen(DFSClient.java:166)
    at org.apache.hadoop.dfs.DFSClient.access$500(DFSClient.java:58)
    at 
org.apache.hadoop.dfs.DFSClient$DFSOutputStream.flush(DFSClient.java:2176)
    at java.io.FilterOutputStream.flush(FilterOutputStream.java:123)
    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
    at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:66)
    at 
org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:99)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:340)
    at 
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2084)

Re: Hadoop/HDFS for scientific simulation output data analysis

Posted by Matei Zaharia <ma...@cloudera.com>.
Hi Tiankai,

The one strange thing I see in your configuration as described is IO buffer
size and IO bytes per checksum set to 64 MB. This is much higher than the
recommended defaults, which are about 64 KB for buffer size and 1 KB or 512
bytes for checksum. (Actually I haven't seen anyone change checksum from its
default of 512 bytes). Having huge buffers is bad for memory consumption and
cache locality.

The other thing that bothers me is that on your 64 MB data set, you have 28
TB of HDFS bytes read. This is off from number of map tasks * bytes per map
by an order of magnitude. Are you sure that you've generated the data set
correctly and that it's the only input path given to your job? Does
bin/hadoop dfs -dus <path to dataset> come out as 1.6 TB?

Matei

On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
<Ti...@deshawresearch.com>wrote:

> Hi,
>
> I have been exploring the feasibility of using Hadoop/HDFS to analyze
> terabyte-scale scientific simulation output datasets. After a set of
> initial experiments, I have a number of questions regarding (1) the
> configuration setting and (2) the IO read performance.
>
> ------------------------------------------------------------------------
> ------------------------------------------------------------------------
> --------------------------------------------------------------
> Unlike typical Hadoop applications, post-simulation analysis usually
> processes one file at a time. So I wrote a
> WholeFileInputFormat/WholeFileRecordReader that reads an entire file
> without parsing the content, as suggested by the Hadoop wiki FAQ.
>
> Specifying WholeFileInputFormat as as input file format
> (conf.setInputFormat(FrameInputFormat.class), I constructed a simple
> MapReduce program with the sole purpose to measure how fast Hadoop/HDFS
> can read data. Here is the gist of the test program:
>
> - The map method does nothing, it returns immediately when called
> - No reduce task (conf.setNumReduceTasks(0)
> - JVM reused (conf.setNumTasksToExecutePerJvm(-1))
>
> The detailed hardware/software configurations are listed below:
>
> Hardware:
> - 103 nodes, each with two 2.33GHz quad-core processors and 16 GB memory
> - 1 GigE connection out of each node and connecting to a 1GigE switch in
> the rack (3 racks in total)
> - Each rack switch has 4 10-GigE connections to a backbone
> full-bandwidth 10-GigE switch (second-tier switch)
> - Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
> node
> - Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
> after dropping linux vfs cache)
>
> Software:
> - 2.6.26-10smp kernel
> - Hadoop 0.19.1
> - Three nodes as namenode, secondary name node, and job tracker,
> respectively
> - Remaining 100 node as slaves, each running as both datanode and
> tasktracker
>
> Relevant hadoop-site.xml setting:
> - dfs.namenode.handler.count = 100
> - io.file.buffer.size = 67108864
> - io.bytes.per.checksum = 67108864
> - mapred.task.timeout = 1200000
> - mapred.map.max.attempts = 8
> - mapred.tasktracker.map.tasks.maximum = 8
> - dfs.replication = 3
> - toploogy.script.file.name set properly to a correct Python script
>
> Dataset characteristics:
>
> - Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2 MB,
> respectively
> - Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
> 256MB files, etc.)
> - Datasets populated into HDFS via a parallel C MPI program (linked with
> libhdfs.so) running on the 100 slave nodes
> - dfs block size set to be the file size (otherwise, accessing a single
> file may require network data transfer)
>
> I launched the test MapReduce job one after another (so there was no
> interference) and collected the following performance results:
>
> Dataset name, Finished in,  Failed/Killed task attempts, HDFS bytes read
> (Map=Total), Rack-local map tasks, Launched map tasks, data-local map
> tasks
>
> 1GB file dataset, 16mins11sec, 0/382, (2,578,054,119,424), 98, 1982,
> 1873
> 256MB file dataset, 50min9sec,0/397, (7,754,295,017,472), 156, 6797,
> 6639
> 64MB file dataset,4hrs18mins21sec,394/251,(28,712,795,897,856), 153,
> 26245, 26068
>
> The job for the 2MB file dataset failed to run due to the following
> error:
>
> 09/03/27 21:39:58 INFO mapred.FileInputFormat: Total input paths to
> process : 819200
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>        at java.util.Arrays.copyOf(Arrays.java:2786)
>        at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:71)
>        at java.io.DataOutputStream.writeByte(DataOutputStream.java:136)
>        at org.apache.hadoop.io.UTF8.writeChars(UTF8.java:274)
>
> After running into this error, the job tracker no longer accepted jobs.
> I stopped and restarted the job tracker with a larger heap size setup
> (8GB). But it still didn't accept new jobs.
>
> ------------------------------------------------------------------------
> ------------------------------------------------------------------------
> --------------------------------------------------------------
> Questions:
>
> (1) Why reading 1GB files is signfiicantly faster than reading smaller
> file sizes, even though reading a 256MB file is as much a bulk transfer?
>
> (2)  Why are the reported HDFS bytes read signfiicantly higher than the
> dataset size (1.6TB)? (The percentage of failed/killed tasks was much
> lower than the extra bytes read.)
>
> (3) What is the maximum number (roughly) of input paths the job tracker
> can handle? (For scientific simulation output dataset, it is quite
> commonplace to have hundreds of thousands to millions of files.)
>
> (4) Even for the 1GB file dataset, considering the percentage of
> data-local map tasks (94.5%), the overall end-to-end read bandwidth
> (1.69GB/s) is much lower than the potential performance offered by the
> hardware (200MB/s local RAID0 read performance, multiplied by 100 slave
> nodes). What are the settings I should change (either in the test
> MapReduce program or in the config files) to obtain better performance?
>
> Thank you very much.
>
> Tiankai
>
>
>
>