You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Alfonso Olias Sanz <al...@gmail.com> on 2008/03/17 14:19:31 UTC

[core-user] Processing binary files Howto??

Hi there.

After reading a bit of the hadoop framework and trying the WordCount
example. I have several doubts about how to use map /reduce with
binary files.

In my case binary files are generated in a time line basis. Let's say
1 file per hour. The size of each file is different (briefly we are
getting pictures from space and the stars density is different between
observations). The mappers, rather than receiving the file content.
They have to receive the file name.  I read that if the input files
are big (several blocks), they are split among several tasks in
same/different node/s (block sizes?).  But we want each map task
processes a file rather than a block (or a line of a file as in the
WordCount sample).

In a previous post I did to this forum. I was recommended to use an
input file with all the file names, so the mappers would receive the
file name. But there is a drawback related with data  location (also
was mentioned this), because data then has to be moved from one node
to another.   Data is not going to be replicated to all the nodes.  So
a task taskA that has to process fileB on nodeN, it has to be executed
on nodeN. How can we achive that???  What if a task requires a file
that is on other node. Does the framework moves the logic to that
node?  We need to define a URI file map in each node
(hostname/path/filename) for all the files. Tasks would access the
local URI file map in order to process the files.

Another approach we have thought is to use the distributed file system
to load balance the data among the nodes. And have our processes
running on every node (without using the map/reduce framework). Then
each process has to access to the local node to process the data,
using the dfs API (or checking the local URI file map).  This approach
would be more flexible to us, because depending on the machine
(cuadcore, dualcore) we know how many java threads we can run in order
to get the maximum performance of the machine.  Using the framework we
can only say a number of tasks to be executed on every node, but all
the nodes have to be the same.

URI file map.
Once the files are copied to the distributed file system, then we need
to create this table map. Or is it a way to access a <directory> at
the data node and retrieve the files it handles? rather than getting
all the files in all the nodes in that <directory>  ie

NodeA  /tmp/.../mytask/input/fileA-1
            /tmp/.../mytask/input/fileA-2

NodeB /tmp/.../mytask/input/fileB

A process at nodeB listing the /tmp/.../input directory, would get only fileB

Any ideas?
Thanks
Alfonso.

Re: [core-user] Processing binary files Howto??

Posted by Alfonso Olias Sanz <al...@gmail.com>.
On 17/03/2008, Ted Dunning <td...@veoh.com> wrote:
>
>  You can certainly do this, but you are simply repeating the work that hadoop
>  developers have already done.

Well the fact is that we have disperse clusters, so running node
agents use http to talk with the grid manager.  At present we are able
to decide the number of app instances+java threads we can run on each
node based on the number of cores and memory. But as far as I have
seen this is not possible with hadoop, where you can say the number of
tasks a node can run, but it will be for all the nodes the same.



>
>  Can you say what kind of satellite data you will be processing?  If it is
>  imagery, then I would imagine that Google's use of map-reduce to prepare
>  image tiles for Google maps would be an interesting example.
>
Not imaginery, raw data from the instruments.

>
>  On 3/17/08 11:12 AM, "Alfonso Olias Sanz" <al...@gmail.com>
>  wrote:
>
>
>  > Another thing we want to considerer is to make our simple grid aware
>  > of the data location in order to move the task to the node which
>  > contains the data. A way of getting the hostname were the
>  > filename-block is and then calling the dfs API from that node.
>
>

Re: [core-user] Processing binary files Howto??

Posted by Ted Dunning <td...@veoh.com>.
Use the default block size of hadoop if you can (64 MB).

Then don't worry about splitting or replication.  Hadoop will do that.


On 3/17/08 11:12 AM, "Alfonso Olias Sanz" <al...@gmail.com>
wrote:

> But we could also try to make every binary file as big as
> a block size (then we need to decide the block size). Or if they are
> bigger than a block, the split them before adding the data to the
> cluster.


Re: [core-user] Processing binary files Howto??

Posted by Ted Dunning <td...@veoh.com>.
You can certainly do this, but you are simply repeating the work that hadoop
developers have already done.

Can you say what kind of satellite data you will be processing?  If it is
imagery, then I would imagine that Google's use of map-reduce to prepare
image tiles for Google maps would be an interesting example.


On 3/17/08 11:12 AM, "Alfonso Olias Sanz" <al...@gmail.com>
wrote:

> Another thing we want to considerer is to make our simple grid aware
> of the data location in order to move the task to the node which
> contains the data. A way of getting the hostname were the
> filename-block is and then calling the dfs API from that node.


Re: [core-user] Processing binary files Howto??

Posted by Alfonso Olias Sanz <al...@gmail.com>.
Hi Ted

Thanks for the info :)

We do not know yet how is going to be generated the input data.
Because the data is going to be observations from a satellite... still
on design.  But we could also try to make every binary file as big as
a block size (then we need to decide the block size). Or if they are
bigger than a block, the split them before adding the data to the
cluster. But the file should be large enough, as you said, in order to
last more than 10 seconds its computation.

Then each task (map) will process the files that are locally stored in
a node (the framework controls this??) <location transparency>

All these is fine. We already have a grid solution with agents on
every node polling for jobs. Each job sent to a node computes 1-n
files (could be zipped) of simulated data.

One solution is to move to map/reduce and let the framework do the
distribution of tasks and data.


Another thing we want to considerer is to make our simple grid aware
of the data location in order to move the task to the node which
contains the data. A way of getting the hostname were the
filename-block is and then calling the dfs API from that node.

Cheers
Alfonso


On 17/03/2008, Ted Dunning <td...@veoh.com> wrote:
>
>
>  This sounds very different from your earlier questions.
>
>  If you have a moderate (10's to 1000's) number of binary files, then it is
>  very easy to write a special purpose InputFormat that tells hadoop that the
>  file is not splittable.  This allows you to add all of the files as inputs
>  to the map step and you will get the locality that you want.  The files
>  should be large enough so that you take at least 10 seconds or more
>  processing them to get good performance relative to startup costs.  If they
>  are not, then you may want to package them up in a form that can be read
>  sequentially.  This need not be splittable, but it would be nice if it were.
>
>  If you are producing a single file per hour, then this style works pretty
>  well.  In my own work, we have a few compressed and encrypted files each
>  hour that are map-reduced into a more congenial and splittable form each
>  hour.  Then subsequent steps are used to aggregate or process the data as
>  needed.
>
>  This gives you all of the locality that you were looking for.
>
>
>  On 3/17/08 6:19 AM, "Alfonso Olias Sanz" <al...@gmail.com>
>  wrote:
>
>
>  > Hi there.
>  >
>  > After reading a bit of the hadoop framework and trying the WordCount
>  > example. I have several doubts about how to use map /reduce with
>  > binary files.
>  >
>  > In my case binary files are generated in a time line basis. Let's say
>  > 1 file per hour. The size of each file is different (briefly we are
>  > getting pictures from space and the stars density is different between
>  > observations). The mappers, rather than receiving the file content.
>  > They have to receive the file name.  I read that if the input files
>  > are big (several blocks), they are split among several tasks in
>  > same/different node/s (block sizes?).  But we want each map task
>  > processes a file rather than a block (or a line of a file as in the
>  > WordCount sample).
>  >
>  > In a previous post I did to this forum. I was recommended to use an
>  > input file with all the file names, so the mappers would receive the
>  > file name. But there is a drawback related with data  location (also
>  > was mentioned this), because data then has to be moved from one node
>  > to another.   Data is not going to be replicated to all the nodes.  So
>  > a task taskA that has to process fileB on nodeN, it has to be executed
>  > on nodeN. How can we achive that???  What if a task requires a file
>  > that is on other node. Does the framework moves the logic to that
>  > node?  We need to define a URI file map in each node
>  > (hostname/path/filename) for all the files. Tasks would access the
>  > local URI file map in order to process the files.
>  >
>  > Another approach we have thought is to use the distributed file system
>  > to load balance the data among the nodes. And have our processes
>  > running on every node (without using the map/reduce framework). Then
>  > each process has to access to the local node to process the data,
>  > using the dfs API (or checking the local URI file map).  This approach
>  > would be more flexible to us, because depending on the machine
>  > (cuadcore, dualcore) we know how many java threads we can run in order
>  > to get the maximum performance of the machine.  Using the framework we
>  > can only say a number of tasks to be executed on every node, but all
>  > the nodes have to be the same.
>  >
>  > URI file map.
>  > Once the files are copied to the distributed file system, then we need
>  > to create this table map. Or is it a way to access a <directory> at
>  > the data node and retrieve the files it handles? rather than getting
>  > all the files in all the nodes in that <directory>  ie
>  >
>  > NodeA  /tmp/.../mytask/input/fileA-1
>  >             /tmp/.../mytask/input/fileA-2
>  >
>  > NodeB /tmp/.../mytask/input/fileB
>  >
>  > A process at nodeB listing the /tmp/.../input directory, would get only fileB
>  >
>  > Any ideas?
>  > Thanks
>  > Alfonso.
>
>

Re: [core-user] Processing binary files Howto??

Posted by Alfonso Olias Sanz <al...@gmail.com>.
On 18/03/2008, Enis Soztutar <en...@gmail.com> wrote:
> Hi, please see below,
>
>
>  Ted Dunning wrote:
>  > This sounds very different from your earlier questions.
>  >
>  > If you have a moderate (10's to 1000's) number of binary files, then it is
>  > very easy to write a special purpose InputFormat that tells hadoop that the
>  > file is not splittable.
>
> @ Ted,
>     actually we have MultiFileInputFormat and MultiFileSplit for exactly
>  this :)
>
>  @ Alfonso,
>     The core of the hadoop does not care about the source
>  of the data(such as files, database, etc). The map and reduce functions
>  operate on records
>  which are just key value pairs. The job of the
>  InputFormat/InputSplit/RecordReader interfaces
>  is to map the actual data source to records.

Yep I know this. That is for the logic of the application, it just
receives a record to process.

>
>  So, if a file contains a few records and no records is split among two
>  files and the total number of files
>  is in the order of ten thousands, you can extend MultiFileInputFormat to
>  return a Records reader which
>  extracts records from these binary files.
>
>  If the above does not apply, you can concatenate  all the files into a
>  smaller number of files, then use FileInputFormat.
>  Then your RecordReader implementation is responsible for finding the
>  record boundaries and extracting the records.
>
>  In both options, storing the files in DFS and using map-red is a wise
>  choice, since mapred over dfs already has locality optimizations. But if
>  you must you can distribute the files to the nodes manually, and
>  implement an ad-hock Partitioner which ensures the map task is executed
>  on the node that has the relevant files.
>

I do not want to distribute the files to the nodes. The files are
already dispersed and replicated in all the cluster nodes. But On top
of the HDFS we are using our grid software.

So in order to make our grid aware of the data location. We have to
ask the HDFS in which node/s the file is. Then the grid workload
manager will run the application on one of the nodes that contains the
file.

We need first to get this approach running. Then we will see how can
we move to a Map/Reduce implementation. Because we have to do some
extra development.

Getting the nodes where a file is, seems to be possible. But I already
posted a new thread because I couldn't get this to work for every
file.
>  > This allows you to add all of the files as inputs
>  > to the map step and you will get the locality that you want.  The files
>  > should be large enough so that you take at least 10 seconds or more
>  > processing them to get good performance relative to startup costs.  If they
>  > are not, then you may want to package them up in a form that can be read
>  > sequentially.  This need not be splittable, but it would be nice if it were.
>  >
>  > If you are producing a single file per hour, then this style works pretty
>  > well.  In my own work, we have a few compressed and encrypted files each
>  > hour that are map-reduced into a more congenial and splittable form each
>  > hour.  Then subsequent steps are used to aggregate or process the data as
>  > needed.
>  >
>  > This gives you all of the locality that you were looking for.
>  >
>  >
>  > On 3/17/08 6:19 AM, "Alfonso Olias Sanz" <al...@gmail.com>
>  > wrote:
>  >
>  >
>  >> Hi there.
>  >>
>  >> After reading a bit of the hadoop framework and trying the WordCount
>  >> example. I have several doubts about how to use map /reduce with
>  >> binary files.
>  >>
>  >> In my case binary files are generated in a time line basis. Let's say
>  >> 1 file per hour. The size of each file is different (briefly we are
>  >> getting pictures from space and the stars density is different between
>  >> observations). The mappers, rather than receiving the file content.
>  >> They have to receive the file name.  I read that if the input files
>  >> are big (several blocks), they are split among several tasks in
>  >> same/different node/s (block sizes?).  But we want each map task
>  >> processes a file rather than a block (or a line of a file as in the
>  >> WordCount sample).
>  >>
>  >> In a previous post I did to this forum. I was recommended to use an
>  >> input file with all the file names, so the mappers would receive the
>  >> file name. But there is a drawback related with data  location (also
>  >> was mentioned this), because data then has to be moved from one node
>  >> to another.   Data is not going to be replicated to all the nodes.  So
>  >> a task taskA that has to process fileB on nodeN, it has to be executed
>  >> on nodeN. How can we achive that???  What if a task requires a file
>  >> that is on other node. Does the framework moves the logic to that
>  >> node?  We need to define a URI file map in each node
>  >> (hostname/path/filename) for all the files. Tasks would access the
>  >> local URI file map in order to process the files.
>  >>
>  >> Another approach we have thought is to use the distributed file system
>  >> to load balance the data among the nodes. And have our processes
>  >> running on every node (without using the map/reduce framework). Then
>  >> each process has to access to the local node to process the data,
>  >> using the dfs API (or checking the local URI file map).  This approach
>  >> would be more flexible to us, because depending on the machine
>  >> (cuadcore, dualcore) we know how many java threads we can run in order
>  >> to get the maximum performance of the machine.  Using the framework we
>  >> can only say a number of tasks to be executed on every node, but all
>  >> the nodes have to be the same.
>  >>
>  >> URI file map.
>  >> Once the files are copied to the distributed file system, then we need
>  >> to create this table map. Or is it a way to access a <directory> at
>  >> the data node and retrieve the files it handles? rather than getting
>  >> all the files in all the nodes in that <directory>  ie
>  >>
>  >> NodeA  /tmp/.../mytask/input/fileA-1
>  >>             /tmp/.../mytask/input/fileA-2
>  >>
>  >> NodeB /tmp/.../mytask/input/fileB
>  >>
>  >> A process at nodeB listing the /tmp/.../input directory, would get only fileB
>  >>
>  >> Any ideas?
>  >> Thanks
>  >> Alfonso.
>  >>
>  >
>  >
>  >
>

Re: [core-user] Processing binary files Howto??

Posted by Enis Soztutar <en...@gmail.com>.
Hi, please see below,

Ted Dunning wrote:
> This sounds very different from your earlier questions.
>
> If you have a moderate (10's to 1000's) number of binary files, then it is
> very easy to write a special purpose InputFormat that tells hadoop that the
> file is not splittable.  
@ Ted,
    actually we have MultiFileInputFormat and MultiFileSplit for exactly 
this :)

@ Alfonso,
    The core of the hadoop does not care about the source
of the data(such as files, database, etc). The map and reduce functions 
operate on records
which are just key value pairs. The job of the 
InputFormat/InputSplit/RecordReader interfaces
is to map the actual data source to records.

So, if a file contains a few records and no records is split among two 
files and the total number of files
is in the order of ten thousands, you can extend MultiFileInputFormat to 
return a Records reader which
extracts records from these binary files.

If the above does not apply, you can concatenate  all the files into a 
smaller number of files, then use FileInputFormat.
Then your RecordReader implementation is responsible for finding the 
record boundaries and extracting the records.

In both options, storing the files in DFS and using map-red is a wise 
choice, since mapred over dfs already has locality optimizations. But if 
you must you can distribute the files to the nodes manually, and 
implement an ad-hock Partitioner which ensures the map task is executed 
on the node that has the relevant files.

> This allows you to add all of the files as inputs
> to the map step and you will get the locality that you want.  The files
> should be large enough so that you take at least 10 seconds or more
> processing them to get good performance relative to startup costs.  If they
> are not, then you may want to package them up in a form that can be read
> sequentially.  This need not be splittable, but it would be nice if it were.
>
> If you are producing a single file per hour, then this style works pretty
> well.  In my own work, we have a few compressed and encrypted files each
> hour that are map-reduced into a more congenial and splittable form each
> hour.  Then subsequent steps are used to aggregate or process the data as
> needed.
>
> This gives you all of the locality that you were looking for.
>
>
> On 3/17/08 6:19 AM, "Alfonso Olias Sanz" <al...@gmail.com>
> wrote:
>
>   
>> Hi there.
>>
>> After reading a bit of the hadoop framework and trying the WordCount
>> example. I have several doubts about how to use map /reduce with
>> binary files.
>>
>> In my case binary files are generated in a time line basis. Let's say
>> 1 file per hour. The size of each file is different (briefly we are
>> getting pictures from space and the stars density is different between
>> observations). The mappers, rather than receiving the file content.
>> They have to receive the file name.  I read that if the input files
>> are big (several blocks), they are split among several tasks in
>> same/different node/s (block sizes?).  But we want each map task
>> processes a file rather than a block (or a line of a file as in the
>> WordCount sample).
>>
>> In a previous post I did to this forum. I was recommended to use an
>> input file with all the file names, so the mappers would receive the
>> file name. But there is a drawback related with data  location (also
>> was mentioned this), because data then has to be moved from one node
>> to another.   Data is not going to be replicated to all the nodes.  So
>> a task taskA that has to process fileB on nodeN, it has to be executed
>> on nodeN. How can we achive that???  What if a task requires a file
>> that is on other node. Does the framework moves the logic to that
>> node?  We need to define a URI file map in each node
>> (hostname/path/filename) for all the files. Tasks would access the
>> local URI file map in order to process the files.
>>
>> Another approach we have thought is to use the distributed file system
>> to load balance the data among the nodes. And have our processes
>> running on every node (without using the map/reduce framework). Then
>> each process has to access to the local node to process the data,
>> using the dfs API (or checking the local URI file map).  This approach
>> would be more flexible to us, because depending on the machine
>> (cuadcore, dualcore) we know how many java threads we can run in order
>> to get the maximum performance of the machine.  Using the framework we
>> can only say a number of tasks to be executed on every node, but all
>> the nodes have to be the same.
>>
>> URI file map.
>> Once the files are copied to the distributed file system, then we need
>> to create this table map. Or is it a way to access a <directory> at
>> the data node and retrieve the files it handles? rather than getting
>> all the files in all the nodes in that <directory>  ie
>>
>> NodeA  /tmp/.../mytask/input/fileA-1
>>             /tmp/.../mytask/input/fileA-2
>>
>> NodeB /tmp/.../mytask/input/fileB
>>
>> A process at nodeB listing the /tmp/.../input directory, would get only fileB
>>
>> Any ideas?
>> Thanks
>> Alfonso.
>>     
>
>
>   

Re: [core-user] Processing binary files Howto??

Posted by Ted Dunning <td...@veoh.com>.

This sounds very different from your earlier questions.

If you have a moderate (10's to 1000's) number of binary files, then it is
very easy to write a special purpose InputFormat that tells hadoop that the
file is not splittable.  This allows you to add all of the files as inputs
to the map step and you will get the locality that you want.  The files
should be large enough so that you take at least 10 seconds or more
processing them to get good performance relative to startup costs.  If they
are not, then you may want to package them up in a form that can be read
sequentially.  This need not be splittable, but it would be nice if it were.

If you are producing a single file per hour, then this style works pretty
well.  In my own work, we have a few compressed and encrypted files each
hour that are map-reduced into a more congenial and splittable form each
hour.  Then subsequent steps are used to aggregate or process the data as
needed.

This gives you all of the locality that you were looking for.


On 3/17/08 6:19 AM, "Alfonso Olias Sanz" <al...@gmail.com>
wrote:

> Hi there.
> 
> After reading a bit of the hadoop framework and trying the WordCount
> example. I have several doubts about how to use map /reduce with
> binary files.
> 
> In my case binary files are generated in a time line basis. Let's say
> 1 file per hour. The size of each file is different (briefly we are
> getting pictures from space and the stars density is different between
> observations). The mappers, rather than receiving the file content.
> They have to receive the file name.  I read that if the input files
> are big (several blocks), they are split among several tasks in
> same/different node/s (block sizes?).  But we want each map task
> processes a file rather than a block (or a line of a file as in the
> WordCount sample).
> 
> In a previous post I did to this forum. I was recommended to use an
> input file with all the file names, so the mappers would receive the
> file name. But there is a drawback related with data  location (also
> was mentioned this), because data then has to be moved from one node
> to another.   Data is not going to be replicated to all the nodes.  So
> a task taskA that has to process fileB on nodeN, it has to be executed
> on nodeN. How can we achive that???  What if a task requires a file
> that is on other node. Does the framework moves the logic to that
> node?  We need to define a URI file map in each node
> (hostname/path/filename) for all the files. Tasks would access the
> local URI file map in order to process the files.
> 
> Another approach we have thought is to use the distributed file system
> to load balance the data among the nodes. And have our processes
> running on every node (without using the map/reduce framework). Then
> each process has to access to the local node to process the data,
> using the dfs API (or checking the local URI file map).  This approach
> would be more flexible to us, because depending on the machine
> (cuadcore, dualcore) we know how many java threads we can run in order
> to get the maximum performance of the machine.  Using the framework we
> can only say a number of tasks to be executed on every node, but all
> the nodes have to be the same.
> 
> URI file map.
> Once the files are copied to the distributed file system, then we need
> to create this table map. Or is it a way to access a <directory> at
> the data node and retrieve the files it handles? rather than getting
> all the files in all the nodes in that <directory>  ie
> 
> NodeA  /tmp/.../mytask/input/fileA-1
>             /tmp/.../mytask/input/fileA-2
> 
> NodeB /tmp/.../mytask/input/fileB
> 
> A process at nodeB listing the /tmp/.../input directory, would get only fileB
> 
> Any ideas?
> Thanks
> Alfonso.