You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Hassen Riahi <ha...@cern.ch> on 2011/06/22 14:51:02 UTC

Parallelize a workflow using mapReduce

Hi all,

I'm looking to parallelize a workflow using mapReduce. The workflow  
can be summarized as following:

1- Specify the list of paths of binary files to process in a  
configuration file (let's call this configuration file CONFIG). These  
binary files are stored in HDFS. This list of path can vary from 1  
files to 10000* files.
2- Process the list of files given in CONFIG: It is done by calling a  
command (let's call it commandX) and giving CONFIG as option, smthg  
like: commandX CONFIG. CommandX reads CONFIG and takes care to open  
the files, process them and generate then the output.
3- Merging...this step can be ignored for now.

The only solutions that I'm seeing to port this workflow to mapReduce  
are:

1- Write a map code which takes as input a list of paths and then call  
appropriately commandX. But, AFAIK, the job will not be split and will  
run as a single mapReduce job over HDFS.
2- Read the input files, and then get the output of the read operation  
and pass it as input to map code. This solution implies a deeper and  
complicated modification of commandX.

Any ideas, comments or suggestions would be appreciated.

Thanks in advance for the help,
Hassen

Re: Parallelize a workflow using mapReduce

Posted by Bibek Paudel <et...@gmail.com>.
On Wed, Jun 22, 2011 at 5:00 PM, Bibek Paudel <et...@gmail.com> wrote:
> On Wed, Jun 22, 2011 at 2:51 PM, Hassen Riahi <ha...@cern.ch> wrote:
>> Hi all,
>>
>> I'm looking to parallelize a workflow using mapReduce. The workflow can be
>> summarized as following:
>>
>> 1- Specify the list of paths of binary files to process in a configuration
>> file (let's call this configuration file CONFIG). These binary files are
>> stored in HDFS. This list of path can vary from 1 files to 10000* files.
>> 2- Process the list of files given in CONFIG: It is done by calling a
>> command (let's call it commandX) and giving CONFIG as option, smthg like:
>> commandX CONFIG. CommandX reads CONFIG and takes care to open the files,
>> process them and generate then the output.
>> 3- Merging...this step can be ignored for now.
>>
>> The only solutions that I'm seeing to port this workflow to mapReduce are:
>>
>> 1- Write a map code which takes as input a list of paths and then call
>> appropriately commandX. But, AFAIK, the job will not be split and will run
>> as a single mapReduce job over HDFS.
>> 2- Read the input files, and then get the output of the read operation and
>> pass it as input to map code. This solution implies a deeper and complicated
>> modification of commandX.
>>
>> Any ideas, comments or suggestions would be appreciated.
>
> Hi,
> If you are looking for a hadoop-oriented solution to this, here is my
> suggestion:
>
> 1. Create a HDFS directory with all your input files in it. If you
> don't want to do this, create a JobConf object and add each input file
> into it (maybe by reading your CONFIG).
>
> 2. Overrise FileInputFormat and return false from isSplitable method-
> this causes each input file to be processed by a single mapper.
>

Of course, the third step would be running CommandX from the mapper :)

-b

Re: Parallelize a workflow using mapReduce

Posted by Joey Echeverria <jo...@cloudera.com>.
> Now, in case that this input file is not split based on HDFS block but
> one-split per file. I will have in consequence only 1 mapper since I have
> only 1 input split. Where the computation of the mapper takes place? in
> machineA or machineB or machine C or in another machine inside the cluster?
> or it is not possible to predict the behavior of the system?

Does the file still take up 3 data blocks? If so, I'd say the solution
is to write the data with a larger block size (block size is a per
file setting). That way, the whole file will be on a single node and
you'll get locality for all of the data.

If the split does cover multiple blocks, the input format would need
to suggest running on a host that contains the most blocks of the
file. I'm not sure if the base FileInputFormat does this. My guess is
it would provide a hint based only on the first block in the file.

-Joey

On Thu, Jun 23, 2011 at 10:12 AM, Hassen Riahi <ha...@cern.ch> wrote:
> Hi,
>
>> Hi,
>>
>>> Between, how the split will be done? I mean the input will be split by
>>> HDFS
>>> block? will I have 1 map task per HDFS block?
>>
>> The default behavior is to split the file based on the HDFS block
>> size, but this depends on the InputFormat and you can also write your
>> own InputFormat to create a split of the size/nature that you want.
>> There are already many InputFormats that other people have written
>> too, please have a look, examples include: splits of N lines,
>> one-split per file and so on.
>>
>> Yes, the default behavior is to have one mapper per input split, but
>> again, this can be overridden by a custom inputformat- for example, if
>> you ask the inputformat not to split a file, and if the file is bigger
>> than the block size.
>>
>> will this workflow benefit
>>>
>>> from Hadoop data locality optimization?
>>>
>>
>> I did not understand this question.
>
> Sorry, I was not clear enough...let's say that I have 1 file stored in HDFS
> and so, let's say that it is split in 3 HDFS blocks. Let's say that these
> HDFS blocks, blockA, blockB and blockC, reside respectively in machineA,
> machineB and machineC.
>
> In another side, let's say also that this file is the input file and it is
> split based on HDFS block and so, I will have one mapper per input split (I
> will have in consequence 3 mappers: mapperA, mapperB and mapperC).
>
> If I understand, it is waited that the mapperA will be executed on machineA,
> and mapperB on machineB...right? if it is the case, that is what I intended
> by the data locality optimization...the fact that each mapper will be
> executed on the machine where data reside optimizes the workflow execution,
> the traffic inside the cluster...
>
> Now, in case that this input file is not split based on HDFS block but
> one-split per file. I will have in consequence only 1 mapper since I have
> only 1 input split. Where the computation of the mapper takes place? in
> machineA or machineB or machine C or in another machine inside the cluster?
> or it is not possible to predict the behavior of the system?
>
> Thanks for the help,
> Hassen
>
>>
>> Thanks,
>> -b
>>
>>>>
>>>> I hope I understood your problem properly, and my suggestion is the
>>>> kind you were looking for.
>>>
>>> Thanks,
>>> Hassen
>>>
>>>>
>>>> Bibek
>>>
>>>
>
>



-- 
Joseph Echeverria
Cloudera, Inc.
443.305.9434

Re: Parallelize a workflow using mapReduce

Posted by Hassen Riahi <ha...@cern.ch>.
Hi,

> Hi,
>
>> Between, how the split will be done? I mean the input will be split  
>> by HDFS
>> block? will I have 1 map task per HDFS block?
>
> The default behavior is to split the file based on the HDFS block
> size, but this depends on the InputFormat and you can also write your
> own InputFormat to create a split of the size/nature that you want.
> There are already many InputFormats that other people have written
> too, please have a look, examples include: splits of N lines,
> one-split per file and so on.
>
> Yes, the default behavior is to have one mapper per input split, but
> again, this can be overridden by a custom inputformat- for example, if
> you ask the inputformat not to split a file, and if the file is bigger
> than the block size.
>
> will this workflow benefit
>> from Hadoop data locality optimization?
>>
>
> I did not understand this question.

Sorry, I was not clear enough...let's say that I have 1 file stored in  
HDFS and so, let's say that it is split in 3 HDFS blocks. Let's say  
that these HDFS blocks, blockA, blockB and blockC, reside respectively  
in machineA, machineB and machineC.

In another side, let's say also that this file is the input file and  
it is split based on HDFS block and so, I will have one mapper per  
input split (I will have in consequence 3 mappers: mapperA, mapperB  
and mapperC).

If I understand, it is waited that the mapperA will be executed on  
machineA, and mapperB on machineB...right? if it is the case, that is  
what I intended by the data locality optimization...the fact that each  
mapper will be executed on the machine where data reside optimizes the  
workflow execution, the traffic inside the cluster...

Now, in case that this input file is not split based on HDFS block but  
one-split per file. I will have in consequence only 1 mapper since I  
have only 1 input split. Where the computation of the mapper takes  
place? in machineA or machineB or machine C or in another machine  
inside the cluster? or it is not possible to predict the behavior of  
the system?

Thanks for the help,
Hassen

>
> Thanks,
> -b
>
>>>
>>> I hope I understood your problem properly, and my suggestion is the
>>> kind you were looking for.
>>
>> Thanks,
>> Hassen
>>
>>>
>>> Bibek
>>
>>


Re: Parallelize a workflow using mapReduce

Posted by Bibek Paudel <et...@gmail.com>.
Hi,

> Between, how the split will be done? I mean the input will be split by HDFS
> block? will I have 1 map task per HDFS block?

The default behavior is to split the file based on the HDFS block
size, but this depends on the InputFormat and you can also write your
own InputFormat to create a split of the size/nature that you want.
There are already many InputFormats that other people have written
too, please have a look, examples include: splits of N lines,
one-split per file and so on.

Yes, the default behavior is to have one mapper per input split, but
again, this can be overridden by a custom inputformat- for example, if
you ask the inputformat not to split a file, and if the file is bigger
than the block size.

 will this workflow benefit
> from Hadoop data locality optimization?
>

I did not understand this question.

Thanks,
-b

>>
>> I hope I understood your problem properly, and my suggestion is the
>> kind you were looking for.
>
> Thanks,
> Hassen
>
>>
>> Bibek
>
>

Re: Parallelize a workflow using mapReduce

Posted by Hassen Riahi <ha...@cern.ch>.
Thanks for the reply!

> On Wed, Jun 22, 2011 at 2:51 PM, Hassen Riahi <ha...@cern.ch>  
> wrote:
>> Hi all,
>>
>> I'm looking to parallelize a workflow using mapReduce. The workflow  
>> can be
>> summarized as following:
>>
>> 1- Specify the list of paths of binary files to process in a  
>> configuration
>> file (let's call this configuration file CONFIG). These binary  
>> files are
>> stored in HDFS. This list of path can vary from 1 files to 10000*  
>> files.
>> 2- Process the list of files given in CONFIG: It is done by calling a
>> command (let's call it commandX) and giving CONFIG as option, smthg  
>> like:
>> commandX CONFIG. CommandX reads CONFIG and takes care to open the  
>> files,
>> process them and generate then the output.
>> 3- Merging...this step can be ignored for now.
>>
>> The only solutions that I'm seeing to port this workflow to  
>> mapReduce are:
>>
>> 1- Write a map code which takes as input a list of paths and then  
>> call
>> appropriately commandX. But, AFAIK, the job will not be split and  
>> will run
>> as a single mapReduce job over HDFS.
>> 2- Read the input files, and then get the output of the read  
>> operation and
>> pass it as input to map code. This solution implies a deeper and  
>> complicated
>> modification of commandX.
>>
>> Any ideas, comments or suggestions would be appreciated.
>
> Hi,
> If you are looking for a hadoop-oriented solution to this, here is my
> suggestion:
>
> 1. Create a HDFS directory with all your input files in it.
it is fine to create a HDFS directory with all my input files.

> If you
> don't want to do this, create a JobConf object and add each input file
> into it (maybe by reading your CONFIG).


>
> 2. Overrise FileInputFormat and return false from isSplitable method-
> this causes each input file to be processed by a single mapper.

it is also fine if the file is split. Since let's say an input file  
file1 is split into file2 and file3, and independently on how the  
split was done, commandX CONFIG(file1) = Merge[(commandX  
CONFIG(file2)) + (commandX CONFIG(file3))]...and as I have mentioned  
before that the merge will be addressed later and for now it is not a  
problem if I'll have 100 output files rather than 1 output file.

Between, how the split will be done? I mean the input will be split by  
HDFS block? will I have 1 map task per HDFS block? will this workflow  
benefit from Hadoop data locality optimization?

>
> I hope I understood your problem properly, and my suggestion is the
> kind you were looking for.

Thanks,
Hassen

>
> Bibek


Re: Parallelize a workflow using mapReduce

Posted by Bibek Paudel <et...@gmail.com>.
On Wed, Jun 22, 2011 at 2:51 PM, Hassen Riahi <ha...@cern.ch> wrote:
> Hi all,
>
> I'm looking to parallelize a workflow using mapReduce. The workflow can be
> summarized as following:
>
> 1- Specify the list of paths of binary files to process in a configuration
> file (let's call this configuration file CONFIG). These binary files are
> stored in HDFS. This list of path can vary from 1 files to 10000* files.
> 2- Process the list of files given in CONFIG: It is done by calling a
> command (let's call it commandX) and giving CONFIG as option, smthg like:
> commandX CONFIG. CommandX reads CONFIG and takes care to open the files,
> process them and generate then the output.
> 3- Merging...this step can be ignored for now.
>
> The only solutions that I'm seeing to port this workflow to mapReduce are:
>
> 1- Write a map code which takes as input a list of paths and then call
> appropriately commandX. But, AFAIK, the job will not be split and will run
> as a single mapReduce job over HDFS.
> 2- Read the input files, and then get the output of the read operation and
> pass it as input to map code. This solution implies a deeper and complicated
> modification of commandX.
>
> Any ideas, comments or suggestions would be appreciated.

Hi,
If you are looking for a hadoop-oriented solution to this, here is my
suggestion:

1. Create a HDFS directory with all your input files in it. If you
don't want to do this, create a JobConf object and add each input file
into it (maybe by reading your CONFIG).

2. Overrise FileInputFormat and return false from isSplitable method-
this causes each input file to be processed by a single mapper.

I hope I understood your problem properly, and my suggestion is the
kind you were looking for.

Bibek

Re: Parallelize a workflow using mapReduce

Posted by Hassen Riahi <ha...@cern.ch>.
Thanks Bobby for the reply! Please find comments inline.

> If your input file is a list of paths each one with \n at the end,  
> the a TextFileInputFormat would split them for you.
>
> I would write it something like the following
>
> Mapper {
>
> Void map(Long offset, String path, collector) {
>   Path p = new Path(path);
>   FileSystem fs = p.getFileSystem(getConf());
>   fs.open;
>   //Copy the file to a temporary location
is this step mandatory? I mean why I can't read the file from HDFS and  
process it from there? What is the gain in doing this step?

>   //Write out temp file path to a CONFIG file
>   try {
>     Process commandx = Runtime.exec(“commandX”,”CONFIG”);
>     //Read output from commandx; and send it to collector
>    } finally {
>       //Delete the temp file
>       //Delete the temp config
>    }
> }


>
> This would launch a new instance of commandX each time.  This  
> assumes that commandX can process a single file at a time, and the  
> the startup overhead is not too big.

This solution can be applied since the commandX can process a single  
file at a time and I will have a map task per file...and that is fine.
However, I can obtain the same result without map/reduce: I can write  
a bash script which split 1 file per task (not a map task...I intend  
by task here a piece of work) and then submit them to the cluster for  
parallel execution. Is it right? or am I missing smthg?

I'm looking to benefit from map/reduce working in conjunction with  
HDFS to optimize the workflows execution and the cluster/storage usage  
(benefit from the data locality optimization...)...If I'm not wrong,  
using map/reduce in conjunction with HDFS means that the splitting  
step will result in a 1 map task / HDFS block and Hadoop will do its  
best to run the map task on a node where the input data resides in HDFS.

> If you do need all the files in one place then you need to look more  
> deeply at what commandx is doing and see how that can be split up  
> into map/reduce.

Sincerely, I would avoid this solution and consider it as the last one  
since I don't know how much time can take and maybe can result in the  
modification of the whole framework...

Thanks
Hassen

>
> --Bobby
>
> On 6/22/11 7:51 AM, "Hassen Riahi" <ha...@cern.ch> wrote:
>
> Hi all,
>
> I'm looking to parallelize a workflow using mapReduce. The workflow
> can be summarized as following:
>
> 1- Specify the list of paths of binary files to process in a
> configuration file (let's call this configuration file CONFIG). These
> binary files are stored in HDFS. This list of path can vary from 1
> files to 10000* files.
> 2- Process the list of files given in CONFIG: It is done by calling a
> command (let's call it commandX) and giving CONFIG as option, smthg
> like: commandX CONFIG. CommandX reads CONFIG and takes care to open
> the files, process them and generate then the output.
> 3- Merging...this step can be ignored for now.
>
> The only solutions that I'm seeing to port this workflow to mapReduce
> are:
>
> 1- Write a map code which takes as input a list of paths and then call
> appropriately commandX. But, AFAIK, the job will not be split and will
> run as a single mapReduce job over HDFS.
> 2- Read the input files, and then get the output of the read operation
> and pass it as input to map code. This solution implies a deeper and
> complicated modification of commandX.
>
> Any ideas, comments or suggestions would be appreciated.
>
> Thanks in advance for the help,
> Hassen
>


Re: Parallelize a workflow using mapReduce

Posted by Robert Evans <ev...@yahoo-inc.com>.
If your input file is a list of paths each one with \n at the end, the a TextFileInputFormat would split them for you.

I would write it something like the following

Mapper {

Void map(Long offset, String path, collector) {
  Path p = new Path(path);
  FileSystem fs = p.getFileSystem(getConf());
  fs.open;
  //Copy the file to a temporary location
  //Write out temp file path to a CONFIG file
  try {
    Process commandx = Runtime.exec("commandX","CONFIG");
    //Read output from commandx; and send it to collector
   } finally {
      //Delete the temp file
      //Delete the temp config
   }
}

This would launch a new instance of commandX each time.  This assumes that commandX can process a single file at a time, and the the startup overhead is not too big.  If you do need all the files in one place then you need to look more deeply at what commandx is doing and see how that can be split up into map/reduce.

--Bobby

On 6/22/11 7:51 AM, "Hassen Riahi" <ha...@cern.ch> wrote:

Hi all,

I'm looking to parallelize a workflow using mapReduce. The workflow
can be summarized as following:

1- Specify the list of paths of binary files to process in a
configuration file (let's call this configuration file CONFIG). These
binary files are stored in HDFS. This list of path can vary from 1
files to 10000* files.
2- Process the list of files given in CONFIG: It is done by calling a
command (let's call it commandX) and giving CONFIG as option, smthg
like: commandX CONFIG. CommandX reads CONFIG and takes care to open
the files, process them and generate then the output.
3- Merging...this step can be ignored for now.

The only solutions that I'm seeing to port this workflow to mapReduce
are:

1- Write a map code which takes as input a list of paths and then call
appropriately commandX. But, AFAIK, the job will not be split and will
run as a single mapReduce job over HDFS.
2- Read the input files, and then get the output of the read operation
and pass it as input to map code. This solution implies a deeper and
complicated modification of commandX.

Any ideas, comments or suggestions would be appreciated.

Thanks in advance for the help,
Hassen