You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Sky USC <sk...@hotmail.com> on 2012/04/18 23:56:09 UTC

Help me with architecture of a somewhat non-trivial mapreduce implementation


Please help me architect the design of my first significant MR task beyond "word count". My program works well. but I am trying to optimize performance to maximize use of available computing resources. I have 3 questions at the bottom. 

Project description in an abstract sense (written in java):
* I have MM number of MANIFEST files available on storage:/root/1.manif.txt to 4000.manif.txt
     * Each MANIFEST in turn contains varilable number "EE" of URLs to EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored on storage:/root/1.manif/1223.folder/5443.Ebook.ebk
So we are talking about millions of ebooks

My task is to:
1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example: publisher, year, ebook-version). 
2. Update each of the EBOOK entry record in the manifest - with the 3 attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
3. Create a output file such that the named "<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls" that met that criteria.
example: 
File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
storage:/root/1.manif/1223.folder/2143.Ebook.ebk
storage:/root/2.manif/2133.folder/5449.Ebook.ebk
storage:/root/2.manif/2133.folder/5450.Ebook.ebk
etc..

and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
storage:/root/19.manif/2223.folder/4343.Ebook.ebk
storage:/root/13.manif/9733.folder/2149.Ebook.ebk
storage:/root/21.manif/3233.folder/1110.Ebook.ebk

etc

4. finally, I also want to output statistics such that:
<publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
PENGUIN_2001_3.12     250,111
RANDOMHOUSE_1999_2.01  11,322
etc

Here is how I implemented:
* My launcher gets list of MM manifests 
* My Mapper gets one manifest. 
 --- It reads the manifest, within a WHILE loop, 
    --- fetches each EBOOK,  and obtain attributes from each ebook, 
    --- updates the manifest for that ebook
    --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
 --- Once all ebooks in the manifest are read, it saves the updated Manifest, and exits
* My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
 --- It writes a new file "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" with all the storage urls for the ebooks
 --- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))

As I mentioned, its working. I launch it on 15 elastic instances. I have three questions:
1. Is this the best way to implement the MR logic?
2. I dont know if each of the instances is getting one task or multiple tasks simultaneously for the MAP portion. If it is not getting multiple MAP tasks, should I go with the route of "multithreaded" reading of ebooks from each manifest? Its not efficient to read just one ebook at a time per machine. Is "Context.write()" threadsafe?
3. I can see log4j logs for main program, but no visibility into logs for Mapper or Reducer. Any idea?


 		 	   		  

Re: Help me with architecture of a somewhat non-trivial mapreduce implementation

Posted by Robert Evans <ev...@yahoo-inc.com>.
You could also use the NLineInputFormat which will launch 1 mapper for every N (configurable) lines of input.


On 4/20/12 9:48 AM, "Sky" <sk...@hotmail.com> wrote:

Thanks! That helped!



-----Original Message-----
From: Michael Segel
Sent: Thursday, April 19, 2012 9:38 PM
To: common-user@hadoop.apache.org
Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce
implementation

If the file is small enough you could read it in to a java object like a
list and write your own input format that takes a list object as its input
and then lets you specify the number of mappers.

On Apr 19, 2012, at 11:34 PM, Sky wrote:

> My file for the input to mapper is very small - as all it has is urls to
> list of manifests. The task for mappers is to fetch each manifest, and
> then fetch files using urls from the manifests and then process them.
> Besides passing around lists of files, I am not really accessing the disk.
> It should be RAM, network, and CPU (unzip, parsexml,extract attributes).
>
> So is my only choice to break the input file and submit multiple files (if
> I have 15 cores, I should split the file with urls to 15 files? also how
> does it look in code?)? The two drawbacks are - some cores might finish
> early and stay idle, and I don't know how to deal with dynamically
> increasing/decreasing cores.
>
> Thx
> - Sky
>
> -----Original Message----- From: Michael Segel
> Sent: Thursday, April 19, 2012 8:49 PM
> To: common-user@hadoop.apache.org
> Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce
> implementation
>
> How 'large' or rather in this case small is your file?
>
> If you're on a default system, the block sizes are 64MB. So if your file
> ~<= 64MB, you end up with 1 block, and you will only have 1 mapper.
>
>
> On Apr 19, 2012, at 10:10 PM, Sky wrote:
>
>> Thanks for your reply.  After I sent my email, I found a fundamental
>> defect - in my understanding of how MR is distributed. I discovered that
>> even though I was firing off 15 COREs, the map job - which is the most
>> expensive part of my processing was run only on 1 core.
>>
>> To start my map job, I was creating a single file with following data:
>> 1 storage:/root/1.manif.txt
>> 2 storage:/root/2.manif.txt
>> 3 storage:/root/3.manif.txt
>> ...
>> 4000 storage:/root/4000.manif.txt
>>
>> I thought that each of the available COREs will be assigned a map job
>> from top down from the same file one at a time, and as soon as one CORE
>> is done, it would get the next map job. However, it looks like I need to
>> split the file into the number of times. Now while that's clearly trivial
>> to do, I am not sure how I can detect at runtime how many splits I need
>> to do, and also to deal with adding new CORES at runtime. Any
>> suggestions? (it doesn't have to be a file, it can be a list, etc).
>>
>> This all would be much easier to debug, if somehow I could get my log4j
>> logs for my mappers and reducers. I can see log4j for my main launcher,
>> but not sure how to enable it for mappers and reducers.
>>
>> Thx
>> - Akash
>>
>>
>> -----Original Message----- From: Robert Evans
>> Sent: Thursday, April 19, 2012 2:08 PM
>> To: common-user@hadoop.apache.org
>> Subject: Re: Help me with architecture of a somewhat non-trivial
>> mapreduce implementation
>>
>> From what I can see your implementation seems OK, especially from a
>> performance perspective. Depending on what storage: is it is likely to be
>> your bottlekneck, not the hadoop computations.
>>
>> Because you are writing files directly instead of relying on Hadoop to do
>> it for you, you may need to deal with error cases that Hadoop will
>> normally hide from you, and you will not be able to turn on speculative
>> execution. Just be aware that a map or reduce task may have problems in
>> the middle, and be relaunched.  So when you are writing out your updated
>> manifest be careful to not replace the old one until the new one is
>> completely ready and will not fail, or you may lose data.  You may also
>> need to be careful in your reduce if you are writing directly to the file
>> there too, but because it is not a read modify write, but just a write it
>> is not as critical.
>>
>> --Bobby Evans
>>
>> On 4/18/12 4:56 PM, "Sky USC" <sk...@hotmail.com> wrote:
>>
>>
>>
>>
>> Please help me architect the design of my first significant MR task
>> beyond "word count". My program works well. but I am trying to optimize
>> performance to maximize use of available computing resources. I have 3
>> questions at the bottom.
>>
>> Project description in an abstract sense (written in java):
>> * I have MM number of MANIFEST files available on
>> storage:/root/1.manif.txt to 4000.manif.txt
>>   * Each MANIFEST in turn contains varilable number "EE" of URLs to
>> EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored
>> on storage:/root/1.manif/1223.folder/5443.Ebook.ebk
>> So we are talking about millions of ebooks
>>
>> My task is to:
>> 1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example:
>> publisher, year, ebook-version).
>> 2. Update each of the EBOOK entry record in the manifest - with the 3
>> attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
>> 3. Create a output file such that the named
>> "<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls"
>> that met that criteria.
>> example:
>> File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
>> storage:/root/1.manif/1223.folder/2143.Ebook.ebk
>> storage:/root/2.manif/2133.folder/5449.Ebook.ebk
>> storage:/root/2.manif/2133.folder/5450.Ebook.ebk
>> etc..
>>
>> and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
>> storage:/root/19.manif/2223.folder/4343.Ebook.ebk
>> storage:/root/13.manif/9733.folder/2149.Ebook.ebk
>> storage:/root/21.manif/3233.folder/1110.Ebook.ebk
>>
>> etc
>>
>> 4. finally, I also want to output statistics such that:
>> <publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
>> PENGUIN_2001_3.12     250,111
>> RANDOMHOUSE_1999_2.01  11,322
>> etc
>>
>> Here is how I implemented:
>> * My launcher gets list of MM manifests
>> * My Mapper gets one manifest.
>> --- It reads the manifest, within a WHILE loop,
>>  --- fetches each EBOOK,  and obtain attributes from each ebook,
>>  --- updates the manifest for that ebook
>>  --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new
>> Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
>> --- Once all ebooks in the manifest are read, it saves the updated
>> Manifest, and exits
>> * My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
>> --- It writes a new file
>> "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" with all the storage
>> urls for the ebooks
>> --- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new
>> IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))
>>
>> As I mentioned, its working. I launch it on 15 elastic instances. I have
>> three questions:
>> 1. Is this the best way to implement the MR logic?
>> 2. I dont know if each of the instances is getting one task or multiple
>> tasks simultaneously for the MAP portion. If it is not getting multiple
>> MAP tasks, should I go with the route of "multithreaded" reading of
>> ebooks from each manifest? Its not efficient to read just one ebook at a
>> time per machine. Is "Context.write()" threadsafe?
>> 3. I can see log4j logs for main program, but no visibility into logs for
>> Mapper or Reducer. Any idea?
>>
>>
>>
>>
>>
>
>



Re: Help me with architecture of a somewhat non-trivial mapreduce implementation

Posted by Sky <sk...@hotmail.com>.
Thanks! That helped!



-----Original Message----- 
From: Michael Segel
Sent: Thursday, April 19, 2012 9:38 PM
To: common-user@hadoop.apache.org
Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce 
implementation

If the file is small enough you could read it in to a java object like a 
list and write your own input format that takes a list object as its input 
and then lets you specify the number of mappers.

On Apr 19, 2012, at 11:34 PM, Sky wrote:

> My file for the input to mapper is very small - as all it has is urls to 
> list of manifests. The task for mappers is to fetch each manifest, and 
> then fetch files using urls from the manifests and then process them. 
> Besides passing around lists of files, I am not really accessing the disk. 
> It should be RAM, network, and CPU (unzip, parsexml,extract attributes).
>
> So is my only choice to break the input file and submit multiple files (if 
> I have 15 cores, I should split the file with urls to 15 files? also how 
> does it look in code?)? The two drawbacks are - some cores might finish 
> early and stay idle, and I don’t know how to deal with dynamically 
> increasing/decreasing cores.
>
> Thx
> - Sky
>
> -----Original Message----- From: Michael Segel
> Sent: Thursday, April 19, 2012 8:49 PM
> To: common-user@hadoop.apache.org
> Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce 
> implementation
>
> How 'large' or rather in this case small is your file?
>
> If you're on a default system, the block sizes are 64MB. So if your file 
> ~<= 64MB, you end up with 1 block, and you will only have 1 mapper.
>
>
> On Apr 19, 2012, at 10:10 PM, Sky wrote:
>
>> Thanks for your reply.  After I sent my email, I found a fundamental 
>> defect - in my understanding of how MR is distributed. I discovered that 
>> even though I was firing off 15 COREs, the map job - which is the most 
>> expensive part of my processing was run only on 1 core.
>>
>> To start my map job, I was creating a single file with following data:
>> 1 storage:/root/1.manif.txt
>> 2 storage:/root/2.manif.txt
>> 3 storage:/root/3.manif.txt
>> ...
>> 4000 storage:/root/4000.manif.txt
>>
>> I thought that each of the available COREs will be assigned a map job 
>> from top down from the same file one at a time, and as soon as one CORE 
>> is done, it would get the next map job. However, it looks like I need to 
>> split the file into the number of times. Now while that’s clearly trivial 
>> to do, I am not sure how I can detect at runtime how many splits I need 
>> to do, and also to deal with adding new CORES at runtime. Any 
>> suggestions? (it doesn't have to be a file, it can be a list, etc).
>>
>> This all would be much easier to debug, if somehow I could get my log4j 
>> logs for my mappers and reducers. I can see log4j for my main launcher, 
>> but not sure how to enable it for mappers and reducers.
>>
>> Thx
>> - Akash
>>
>>
>> -----Original Message----- From: Robert Evans
>> Sent: Thursday, April 19, 2012 2:08 PM
>> To: common-user@hadoop.apache.org
>> Subject: Re: Help me with architecture of a somewhat non-trivial 
>> mapreduce implementation
>>
>> From what I can see your implementation seems OK, especially from a 
>> performance perspective. Depending on what storage: is it is likely to be 
>> your bottlekneck, not the hadoop computations.
>>
>> Because you are writing files directly instead of relying on Hadoop to do 
>> it for you, you may need to deal with error cases that Hadoop will 
>> normally hide from you, and you will not be able to turn on speculative 
>> execution. Just be aware that a map or reduce task may have problems in 
>> the middle, and be relaunched.  So when you are writing out your updated 
>> manifest be careful to not replace the old one until the new one is 
>> completely ready and will not fail, or you may lose data.  You may also 
>> need to be careful in your reduce if you are writing directly to the file 
>> there too, but because it is not a read modify write, but just a write it 
>> is not as critical.
>>
>> --Bobby Evans
>>
>> On 4/18/12 4:56 PM, "Sky USC" <sk...@hotmail.com> wrote:
>>
>>
>>
>>
>> Please help me architect the design of my first significant MR task 
>> beyond "word count". My program works well. but I am trying to optimize 
>> performance to maximize use of available computing resources. I have 3 
>> questions at the bottom.
>>
>> Project description in an abstract sense (written in java):
>> * I have MM number of MANIFEST files available on 
>> storage:/root/1.manif.txt to 4000.manif.txt
>>   * Each MANIFEST in turn contains varilable number "EE" of URLs to 
>> EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored 
>> on storage:/root/1.manif/1223.folder/5443.Ebook.ebk
>> So we are talking about millions of ebooks
>>
>> My task is to:
>> 1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example: 
>> publisher, year, ebook-version).
>> 2. Update each of the EBOOK entry record in the manifest - with the 3 
>> attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
>> 3. Create a output file such that the named 
>> "<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls" 
>> that met that criteria.
>> example:
>> File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
>> storage:/root/1.manif/1223.folder/2143.Ebook.ebk
>> storage:/root/2.manif/2133.folder/5449.Ebook.ebk
>> storage:/root/2.manif/2133.folder/5450.Ebook.ebk
>> etc..
>>
>> and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
>> storage:/root/19.manif/2223.folder/4343.Ebook.ebk
>> storage:/root/13.manif/9733.folder/2149.Ebook.ebk
>> storage:/root/21.manif/3233.folder/1110.Ebook.ebk
>>
>> etc
>>
>> 4. finally, I also want to output statistics such that:
>> <publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
>> PENGUIN_2001_3.12     250,111
>> RANDOMHOUSE_1999_2.01  11,322
>> etc
>>
>> Here is how I implemented:
>> * My launcher gets list of MM manifests
>> * My Mapper gets one manifest.
>> --- It reads the manifest, within a WHILE loop,
>>  --- fetches each EBOOK,  and obtain attributes from each ebook,
>>  --- updates the manifest for that ebook
>>  --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new 
>> Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
>> --- Once all ebooks in the manifest are read, it saves the updated 
>> Manifest, and exits
>> * My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
>> --- It writes a new file 
>> "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" with all the storage 
>> urls for the ebooks
>> --- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new 
>> IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))
>>
>> As I mentioned, its working. I launch it on 15 elastic instances. I have 
>> three questions:
>> 1. Is this the best way to implement the MR logic?
>> 2. I dont know if each of the instances is getting one task or multiple 
>> tasks simultaneously for the MAP portion. If it is not getting multiple 
>> MAP tasks, should I go with the route of "multithreaded" reading of 
>> ebooks from each manifest? Its not efficient to read just one ebook at a 
>> time per machine. Is "Context.write()" threadsafe?
>> 3. I can see log4j logs for main program, but no visibility into logs for 
>> Mapper or Reducer. Any idea?
>>
>>
>>
>>
>>
>
>


Re: Help me with architecture of a somewhat non-trivial mapreduce implementation

Posted by Michael Segel <mi...@hotmail.com>.
If the file is small enough you could read it in to a java object like a list and write your own input format that takes a list object as its input and then lets you specify the number of mappers.

On Apr 19, 2012, at 11:34 PM, Sky wrote:

> My file for the input to mapper is very small - as all it has is urls to list of manifests. The task for mappers is to fetch each manifest, and then fetch files using urls from the manifests and then process them.  Besides passing around lists of files, I am not really accessing the disk. It should be RAM, network, and CPU (unzip, parsexml,extract attributes).
> 
> So is my only choice to break the input file and submit multiple files (if I have 15 cores, I should split the file with urls to 15 files? also how does it look in code?)? The two drawbacks are - some cores might finish early and stay idle, and I don’t know how to deal with dynamically increasing/decreasing cores.
> 
> Thx
> - Sky
> 
> -----Original Message----- From: Michael Segel
> Sent: Thursday, April 19, 2012 8:49 PM
> To: common-user@hadoop.apache.org
> Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce implementation
> 
> How 'large' or rather in this case small is your file?
> 
> If you're on a default system, the block sizes are 64MB. So if your file ~<= 64MB, you end up with 1 block, and you will only have 1 mapper.
> 
> 
> On Apr 19, 2012, at 10:10 PM, Sky wrote:
> 
>> Thanks for your reply.  After I sent my email, I found a fundamental defect - in my understanding of how MR is distributed. I discovered that even though I was firing off 15 COREs, the map job - which is the most expensive part of my processing was run only on 1 core.
>> 
>> To start my map job, I was creating a single file with following data:
>> 1 storage:/root/1.manif.txt
>> 2 storage:/root/2.manif.txt
>> 3 storage:/root/3.manif.txt
>> ...
>> 4000 storage:/root/4000.manif.txt
>> 
>> I thought that each of the available COREs will be assigned a map job from top down from the same file one at a time, and as soon as one CORE is done, it would get the next map job. However, it looks like I need to split the file into the number of times. Now while that’s clearly trivial to do, I am not sure how I can detect at runtime how many splits I need to do, and also to deal with adding new CORES at runtime. Any suggestions? (it doesn't have to be a file, it can be a list, etc).
>> 
>> This all would be much easier to debug, if somehow I could get my log4j logs for my mappers and reducers. I can see log4j for my main launcher, but not sure how to enable it for mappers and reducers.
>> 
>> Thx
>> - Akash
>> 
>> 
>> -----Original Message----- From: Robert Evans
>> Sent: Thursday, April 19, 2012 2:08 PM
>> To: common-user@hadoop.apache.org
>> Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce implementation
>> 
>> From what I can see your implementation seems OK, especially from a performance perspective. Depending on what storage: is it is likely to be your bottlekneck, not the hadoop computations.
>> 
>> Because you are writing files directly instead of relying on Hadoop to do it for you, you may need to deal with error cases that Hadoop will normally hide from you, and you will not be able to turn on speculative execution. Just be aware that a map or reduce task may have problems in the middle, and be relaunched.  So when you are writing out your updated manifest be careful to not replace the old one until the new one is completely ready and will not fail, or you may lose data.  You may also need to be careful in your reduce if you are writing directly to the file there too, but because it is not a read modify write, but just a write it is not as critical.
>> 
>> --Bobby Evans
>> 
>> On 4/18/12 4:56 PM, "Sky USC" <sk...@hotmail.com> wrote:
>> 
>> 
>> 
>> 
>> Please help me architect the design of my first significant MR task beyond "word count". My program works well. but I am trying to optimize performance to maximize use of available computing resources. I have 3 questions at the bottom.
>> 
>> Project description in an abstract sense (written in java):
>> * I have MM number of MANIFEST files available on storage:/root/1.manif.txt to 4000.manif.txt
>>   * Each MANIFEST in turn contains varilable number "EE" of URLs to EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored on storage:/root/1.manif/1223.folder/5443.Ebook.ebk
>> So we are talking about millions of ebooks
>> 
>> My task is to:
>> 1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example: publisher, year, ebook-version).
>> 2. Update each of the EBOOK entry record in the manifest - with the 3 attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
>> 3. Create a output file such that the named "<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls" that met that criteria.
>> example:
>> File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
>> storage:/root/1.manif/1223.folder/2143.Ebook.ebk
>> storage:/root/2.manif/2133.folder/5449.Ebook.ebk
>> storage:/root/2.manif/2133.folder/5450.Ebook.ebk
>> etc..
>> 
>> and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
>> storage:/root/19.manif/2223.folder/4343.Ebook.ebk
>> storage:/root/13.manif/9733.folder/2149.Ebook.ebk
>> storage:/root/21.manif/3233.folder/1110.Ebook.ebk
>> 
>> etc
>> 
>> 4. finally, I also want to output statistics such that:
>> <publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
>> PENGUIN_2001_3.12     250,111
>> RANDOMHOUSE_1999_2.01  11,322
>> etc
>> 
>> Here is how I implemented:
>> * My launcher gets list of MM manifests
>> * My Mapper gets one manifest.
>> --- It reads the manifest, within a WHILE loop,
>>  --- fetches each EBOOK,  and obtain attributes from each ebook,
>>  --- updates the manifest for that ebook
>>  --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
>> --- Once all ebooks in the manifest are read, it saves the updated Manifest, and exits
>> * My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
>> --- It writes a new file "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" with all the storage urls for the ebooks
>> --- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))
>> 
>> As I mentioned, its working. I launch it on 15 elastic instances. I have three questions:
>> 1. Is this the best way to implement the MR logic?
>> 2. I dont know if each of the instances is getting one task or multiple tasks simultaneously for the MAP portion. If it is not getting multiple MAP tasks, should I go with the route of "multithreaded" reading of ebooks from each manifest? Its not efficient to read just one ebook at a time per machine. Is "Context.write()" threadsafe?
>> 3. I can see log4j logs for main program, but no visibility into logs for Mapper or Reducer. Any idea?
>> 
>> 
>> 
>> 
>> 
> 
> 


Re: Help me with architecture of a somewhat non-trivial mapreduce implementation

Posted by Sky <sk...@hotmail.com>.
My file for the input to mapper is very small - as all it has is urls to 
list of manifests. The task for mappers is to fetch each manifest, and then 
fetch files using urls from the manifests and then process them.  Besides 
passing around lists of files, I am not really accessing the disk. It should 
be RAM, network, and CPU (unzip, parsexml,extract attributes).

So is my only choice to break the input file and submit multiple files (if I 
have 15 cores, I should split the file with urls to 15 files? also how does 
it look in code?)? The two drawbacks are - some cores might finish early and 
stay idle, and I don’t know how to deal with dynamically 
increasing/decreasing cores.

Thx
- Sky

-----Original Message----- 
From: Michael Segel
Sent: Thursday, April 19, 2012 8:49 PM
To: common-user@hadoop.apache.org
Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce 
implementation

How 'large' or rather in this case small is your file?

If you're on a default system, the block sizes are 64MB. So if your file ~<= 
64MB, you end up with 1 block, and you will only have 1 mapper.


On Apr 19, 2012, at 10:10 PM, Sky wrote:

> Thanks for your reply.  After I sent my email, I found a fundamental 
> defect - in my understanding of how MR is distributed. I discovered that 
> even though I was firing off 15 COREs, the map job - which is the most 
> expensive part of my processing was run only on 1 core.
>
> To start my map job, I was creating a single file with following data:
>  1 storage:/root/1.manif.txt
>  2 storage:/root/2.manif.txt
>  3 storage:/root/3.manif.txt
>  ...
>  4000 storage:/root/4000.manif.txt
>
> I thought that each of the available COREs will be assigned a map job from 
> top down from the same file one at a time, and as soon as one CORE is 
> done, it would get the next map job. However, it looks like I need to 
> split the file into the number of times. Now while that’s clearly trivial 
> to do, I am not sure how I can detect at runtime how many splits I need to 
> do, and also to deal with adding new CORES at runtime. Any suggestions? 
> (it doesn't have to be a file, it can be a list, etc).
>
> This all would be much easier to debug, if somehow I could get my log4j 
> logs for my mappers and reducers. I can see log4j for my main launcher, 
> but not sure how to enable it for mappers and reducers.
>
> Thx
> - Akash
>
>
> -----Original Message----- From: Robert Evans
> Sent: Thursday, April 19, 2012 2:08 PM
> To: common-user@hadoop.apache.org
> Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce 
> implementation
>
> From what I can see your implementation seems OK, especially from a 
> performance perspective. Depending on what storage: is it is likely to be 
> your bottlekneck, not the hadoop computations.
>
> Because you are writing files directly instead of relying on Hadoop to do 
> it for you, you may need to deal with error cases that Hadoop will 
> normally hide from you, and you will not be able to turn on speculative 
> execution. Just be aware that a map or reduce task may have problems in 
> the middle, and be relaunched.  So when you are writing out your updated 
> manifest be careful to not replace the old one until the new one is 
> completely ready and will not fail, or you may lose data.  You may also 
> need to be careful in your reduce if you are writing directly to the file 
> there too, but because it is not a read modify write, but just a write it 
> is not as critical.
>
> --Bobby Evans
>
> On 4/18/12 4:56 PM, "Sky USC" <sk...@hotmail.com> wrote:
>
>
>
>
> Please help me architect the design of my first significant MR task beyond 
> "word count". My program works well. but I am trying to optimize 
> performance to maximize use of available computing resources. I have 3 
> questions at the bottom.
>
> Project description in an abstract sense (written in java):
> * I have MM number of MANIFEST files available on 
> storage:/root/1.manif.txt to 4000.manif.txt
>    * Each MANIFEST in turn contains varilable number "EE" of URLs to 
> EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored 
> on storage:/root/1.manif/1223.folder/5443.Ebook.ebk
> So we are talking about millions of ebooks
>
> My task is to:
> 1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example: 
> publisher, year, ebook-version).
> 2. Update each of the EBOOK entry record in the manifest - with the 3 
> attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
> 3. Create a output file such that the named 
> "<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls" 
> that met that criteria.
> example:
> File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
> storage:/root/1.manif/1223.folder/2143.Ebook.ebk
> storage:/root/2.manif/2133.folder/5449.Ebook.ebk
> storage:/root/2.manif/2133.folder/5450.Ebook.ebk
> etc..
>
> and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
> storage:/root/19.manif/2223.folder/4343.Ebook.ebk
> storage:/root/13.manif/9733.folder/2149.Ebook.ebk
> storage:/root/21.manif/3233.folder/1110.Ebook.ebk
>
> etc
>
> 4. finally, I also want to output statistics such that:
> <publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
> PENGUIN_2001_3.12     250,111
> RANDOMHOUSE_1999_2.01  11,322
> etc
>
> Here is how I implemented:
> * My launcher gets list of MM manifests
> * My Mapper gets one manifest.
> --- It reads the manifest, within a WHILE loop,
>   --- fetches each EBOOK,  and obtain attributes from each ebook,
>   --- updates the manifest for that ebook
>   --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new 
> Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
> --- Once all ebooks in the manifest are read, it saves the updated 
> Manifest, and exits
> * My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
> --- It writes a new file "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" 
> with all the storage urls for the ebooks
> --- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new 
> IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))
>
> As I mentioned, its working. I launch it on 15 elastic instances. I have 
> three questions:
> 1. Is this the best way to implement the MR logic?
> 2. I dont know if each of the instances is getting one task or multiple 
> tasks simultaneously for the MAP portion. If it is not getting multiple 
> MAP tasks, should I go with the route of "multithreaded" reading of ebooks 
> from each manifest? Its not efficient to read just one ebook at a time per 
> machine. Is "Context.write()" threadsafe?
> 3. I can see log4j logs for main program, but no visibility into logs for 
> Mapper or Reducer. Any idea?
>
>
>
>
>


Re: Help me with architecture of a somewhat non-trivial mapreduce implementation

Posted by Michael Segel <mi...@hotmail.com>.
How 'large' or rather in this case small is your file? 

If you're on a default system, the block sizes are 64MB. So if your file ~<= 64MB, you end up with 1 block, and you will only have 1 mapper. 


On Apr 19, 2012, at 10:10 PM, Sky wrote:

> Thanks for your reply.  After I sent my email, I found a fundamental defect - in my understanding of how MR is distributed. I discovered that even though I was firing off 15 COREs, the map job - which is the most expensive part of my processing was run only on 1 core.
> 
> To start my map job, I was creating a single file with following data:
>  1 storage:/root/1.manif.txt
>  2 storage:/root/2.manif.txt
>  3 storage:/root/3.manif.txt
>  ...
>  4000 storage:/root/4000.manif.txt
> 
> I thought that each of the available COREs will be assigned a map job from top down from the same file one at a time, and as soon as one CORE is done, it would get the next map job. However, it looks like I need to split the file into the number of times. Now while that’s clearly trivial to do, I am not sure how I can detect at runtime how many splits I need to do, and also to deal with adding new CORES at runtime. Any suggestions?  (it doesn't have to be a file, it can be a list, etc).
> 
> This all would be much easier to debug, if somehow I could get my log4j logs for my mappers and reducers. I can see log4j for my main launcher, but not sure how to enable it for mappers and reducers.
> 
> Thx
> - Akash
> 
> 
> -----Original Message----- From: Robert Evans
> Sent: Thursday, April 19, 2012 2:08 PM
> To: common-user@hadoop.apache.org
> Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce implementation
> 
> From what I can see your implementation seems OK, especially from a performance perspective. Depending on what storage: is it is likely to be your bottlekneck, not the hadoop computations.
> 
> Because you are writing files directly instead of relying on Hadoop to do it for you, you may need to deal with error cases that Hadoop will normally hide from you, and you will not be able to turn on speculative execution. Just be aware that a map or reduce task may have problems in the middle, and be relaunched.  So when you are writing out your updated manifest be careful to not replace the old one until the new one is completely ready and will not fail, or you may lose data.  You may also need to be careful in your reduce if you are writing directly to the file there too, but because it is not a read modify write, but just a write it is not as critical.
> 
> --Bobby Evans
> 
> On 4/18/12 4:56 PM, "Sky USC" <sk...@hotmail.com> wrote:
> 
> 
> 
> 
> Please help me architect the design of my first significant MR task beyond "word count". My program works well. but I am trying to optimize performance to maximize use of available computing resources. I have 3 questions at the bottom.
> 
> Project description in an abstract sense (written in java):
> * I have MM number of MANIFEST files available on storage:/root/1.manif.txt to 4000.manif.txt
>    * Each MANIFEST in turn contains varilable number "EE" of URLs to EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored on storage:/root/1.manif/1223.folder/5443.Ebook.ebk
> So we are talking about millions of ebooks
> 
> My task is to:
> 1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example: publisher, year, ebook-version).
> 2. Update each of the EBOOK entry record in the manifest - with the 3 attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
> 3. Create a output file such that the named "<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls" that met that criteria.
> example:
> File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
> storage:/root/1.manif/1223.folder/2143.Ebook.ebk
> storage:/root/2.manif/2133.folder/5449.Ebook.ebk
> storage:/root/2.manif/2133.folder/5450.Ebook.ebk
> etc..
> 
> and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
> storage:/root/19.manif/2223.folder/4343.Ebook.ebk
> storage:/root/13.manif/9733.folder/2149.Ebook.ebk
> storage:/root/21.manif/3233.folder/1110.Ebook.ebk
> 
> etc
> 
> 4. finally, I also want to output statistics such that:
> <publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
> PENGUIN_2001_3.12     250,111
> RANDOMHOUSE_1999_2.01  11,322
> etc
> 
> Here is how I implemented:
> * My launcher gets list of MM manifests
> * My Mapper gets one manifest.
> --- It reads the manifest, within a WHILE loop,
>   --- fetches each EBOOK,  and obtain attributes from each ebook,
>   --- updates the manifest for that ebook
>   --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
> --- Once all ebooks in the manifest are read, it saves the updated Manifest, and exits
> * My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
> --- It writes a new file "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" with all the storage urls for the ebooks
> --- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))
> 
> As I mentioned, its working. I launch it on 15 elastic instances. I have three questions:
> 1. Is this the best way to implement the MR logic?
> 2. I dont know if each of the instances is getting one task or multiple tasks simultaneously for the MAP portion. If it is not getting multiple MAP tasks, should I go with the route of "multithreaded" reading of ebooks from each manifest? Its not efficient to read just one ebook at a time per machine. Is "Context.write()" threadsafe?
> 3. I can see log4j logs for main program, but no visibility into logs for Mapper or Reducer. Any idea?
> 
> 
> 
> 
> 


Re: Help me with architecture of a somewhat non-trivial mapreduce implementation

Posted by Sky <sk...@hotmail.com>.
Thanks for your reply.  After I sent my email, I found a fundamental 
defect - in my understanding of how MR is distributed. I discovered that 
even though I was firing off 15 COREs, the map job - which is the most 
expensive part of my processing was run only on 1 core.

To start my map job, I was creating a single file with following data:
   1 storage:/root/1.manif.txt
   2 storage:/root/2.manif.txt
   3 storage:/root/3.manif.txt
   ...
   4000 storage:/root/4000.manif.txt

I thought that each of the available COREs will be assigned a map job from 
top down from the same file one at a time, and as soon as one CORE is done, 
it would get the next map job. However, it looks like I need to split the 
file into the number of times. Now while that’s clearly trivial to do, I am 
not sure how I can detect at runtime how many splits I need to do, and also 
to deal with adding new CORES at runtime. Any suggestions?  (it doesn't have 
to be a file, it can be a list, etc).

This all would be much easier to debug, if somehow I could get my log4j logs 
for my mappers and reducers. I can see log4j for my main launcher, but not 
sure how to enable it for mappers and reducers.

Thx
- Akash


-----Original Message----- 
From: Robert Evans
Sent: Thursday, April 19, 2012 2:08 PM
To: common-user@hadoop.apache.org
Subject: Re: Help me with architecture of a somewhat non-trivial mapreduce 
implementation

>From what I can see your implementation seems OK, especially from a 
performance perspective. Depending on what storage: is it is likely to be 
your bottlekneck, not the hadoop computations.

Because you are writing files directly instead of relying on Hadoop to do it 
for you, you may need to deal with error cases that Hadoop will normally 
hide from you, and you will not be able to turn on speculative execution. 
Just be aware that a map or reduce task may have problems in the middle, and 
be relaunched.  So when you are writing out your updated manifest be careful 
to not replace the old one until the new one is completely ready and will 
not fail, or you may lose data.  You may also need to be careful in your 
reduce if you are writing directly to the file there too, but because it is 
not a read modify write, but just a write it is not as critical.

--Bobby Evans

On 4/18/12 4:56 PM, "Sky USC" <sk...@hotmail.com> wrote:




Please help me architect the design of my first significant MR task beyond 
"word count". My program works well. but I am trying to optimize performance 
to maximize use of available computing resources. I have 3 questions at the 
bottom.

Project description in an abstract sense (written in java):
* I have MM number of MANIFEST files available on storage:/root/1.manif.txt 
to 4000.manif.txt
     * Each MANIFEST in turn contains varilable number "EE" of URLs to 
EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored on 
storage:/root/1.manif/1223.folder/5443.Ebook.ebk
So we are talking about millions of ebooks

My task is to:
1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example: 
publisher, year, ebook-version).
2. Update each of the EBOOK entry record in the manifest - with the 3 
attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
3. Create a output file such that the named 
"<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls" 
that met that criteria.
example:
File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
storage:/root/1.manif/1223.folder/2143.Ebook.ebk
storage:/root/2.manif/2133.folder/5449.Ebook.ebk
storage:/root/2.manif/2133.folder/5450.Ebook.ebk
etc..

and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
storage:/root/19.manif/2223.folder/4343.Ebook.ebk
storage:/root/13.manif/9733.folder/2149.Ebook.ebk
storage:/root/21.manif/3233.folder/1110.Ebook.ebk

etc

4. finally, I also want to output statistics such that:
<publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
PENGUIN_2001_3.12     250,111
RANDOMHOUSE_1999_2.01  11,322
etc

Here is how I implemented:
* My launcher gets list of MM manifests
* My Mapper gets one manifest.
--- It reads the manifest, within a WHILE loop,
    --- fetches each EBOOK,  and obtain attributes from each ebook,
    --- updates the manifest for that ebook
    --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new 
Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
--- Once all ebooks in the manifest are read, it saves the updated Manifest, 
and exits
* My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
--- It writes a new file "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" 
with all the storage urls for the ebooks
--- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new 
IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))

As I mentioned, its working. I launch it on 15 elastic instances. I have 
three questions:
1. Is this the best way to implement the MR logic?
2. I dont know if each of the instances is getting one task or multiple 
tasks simultaneously for the MAP portion. If it is not getting multiple MAP 
tasks, should I go with the route of "multithreaded" reading of ebooks from 
each manifest? Its not efficient to read just one ebook at a time per 
machine. Is "Context.write()" threadsafe?
3. I can see log4j logs for main program, but no visibility into logs for 
Mapper or Reducer. Any idea?





Re: Help me with architecture of a somewhat non-trivial mapreduce implementation

Posted by Robert Evans <ev...@yahoo-inc.com>.
>From what I can see your implementation seems OK, especially from a performance perspective. Depending on what storage: is it is likely to be your bottlekneck, not the hadoop computations.

Because you are writing files directly instead of relying on Hadoop to do it for you, you may need to deal with error cases that Hadoop will normally hide from you, and you will not be able to turn on speculative execution.  Just be aware that a map or reduce task may have problems in the middle, and be relaunched.  So when you are writing out your updated manifest be careful to not replace the old one until the new one is completely ready and will not fail, or you may lose data.  You may also need to be careful in your reduce if you are writing directly to the file there too, but because it is not a read modify write, but just a write it is not as critical.

--Bobby Evans

On 4/18/12 4:56 PM, "Sky USC" <sk...@hotmail.com> wrote:




Please help me architect the design of my first significant MR task beyond "word count". My program works well. but I am trying to optimize performance to maximize use of available computing resources. I have 3 questions at the bottom.

Project description in an abstract sense (written in java):
* I have MM number of MANIFEST files available on storage:/root/1.manif.txt to 4000.manif.txt
     * Each MANIFEST in turn contains varilable number "EE" of URLs to EBOOKS (range could be 10000 - 50,000 EBOOKS urls per MANIFEST) -- stored on storage:/root/1.manif/1223.folder/5443.Ebook.ebk
So we are talking about millions of ebooks

My task is to:
1. Fetch each ebook, and obtain a set of 3 attributes per ebook (example: publisher, year, ebook-version).
2. Update each of the EBOOK entry record in the manifest - with the 3 attributes (eg: ebook 1334 -> publisher=aaa year=bbb, ebook-version=2.01)
3. Create a output file such that the named "<publisher>_<year>_<ebook-version>"  contains a list of all "ebook urls" that met that criteria.
example:
File "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" contains:
storage:/root/1.manif/1223.folder/2143.Ebook.ebk
storage:/root/2.manif/2133.folder/5449.Ebook.ebk
storage:/root/2.manif/2133.folder/5450.Ebook.ebk
etc..

and File "storage:/root/summary/PENGUIN_2001_3.12.txt" contains:
storage:/root/19.manif/2223.folder/4343.Ebook.ebk
storage:/root/13.manif/9733.folder/2149.Ebook.ebk
storage:/root/21.manif/3233.folder/1110.Ebook.ebk

etc

4. finally, I also want to output statistics such that:
<publisher>_<year>_<ebook-version>  <COUNT_OF_URLs>
PENGUIN_2001_3.12     250,111
RANDOMHOUSE_1999_2.01  11,322
etc

Here is how I implemented:
* My launcher gets list of MM manifests
* My Mapper gets one manifest.
 --- It reads the manifest, within a WHILE loop,
    --- fetches each EBOOK,  and obtain attributes from each ebook,
    --- updates the manifest for that ebook
    --- context.write(new Text("RANDOMHOUSE_1999_2.01"), new Text("storage:/root/1.manif/1223.folder/2143.Ebook.ebk"))
 --- Once all ebooks in the manifest are read, it saves the updated Manifest, and exits
* My Reducer gets the "RANDOMHOUSE_1999_2.01" and a list of ebooks urls.
 --- It writes a new file "storage:/root/summary/RANDOMHOUSE_1999_2.01.txt" with all the storage urls for the ebooks
 --- It also does a context.write(new Text("RANDOMHOUSE_1999_2.01"), new IntWritable(SUM_OF_ALL_EBOOK_URLS_FROM_THE_LIST))

As I mentioned, its working. I launch it on 15 elastic instances. I have three questions:
1. Is this the best way to implement the MR logic?
2. I dont know if each of the instances is getting one task or multiple tasks simultaneously for the MAP portion. If it is not getting multiple MAP tasks, should I go with the route of "multithreaded" reading of ebooks from each manifest? Its not efficient to read just one ebook at a time per machine. Is "Context.write()" threadsafe?
3. I can see log4j logs for main program, but no visibility into logs for Mapper or Reducer. Any idea?