You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2010/08/17 21:29:25 UTC

[Pig Wiki] Update of "NativeMapReduce" by Aniket Mokashi

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The "NativeMapReduce" page has been changed by Aniket Mokashi.
http://wiki.apache.org/pig/NativeMapReduce?action=diff&rev1=8&rev2=9

--------------------------------------------------

+ = Under Construction =
  #format wiki
  #language en
  
@@ -18, +19 @@

  To support native mapreduce job pig will support following syntax-
  {{{
  X = ... ;
- Y = MAPREDUCE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' USING storeFunc LOAD 'loadLocation' USING loadFunc AS schema [params, ... ];
+ Y = MAPREDUCE 'mymr.jar' [('other.jar', ...)] STORE X INTO 'inputLocation' USING storeFunc LOAD 'outputLocation' USING loadFunc AS schema [`params, ... `];
  }}}
  
- This stores '''X''' into the '''storeLocation''' using '''storeFunc''', which is then used by native mapreduce to read its data. After we run mymr.jar's mapreduce, we load back the data from '''loadLocation''' into alias '''Y''' using '''loadFunc'''.
+ This stores '''X''' into the '''inputLocation''' using '''storeFunc''', which is then used by native mapreduce to read its data. After we run mymr.jar's mapreduce, we load back the data from '''outputLocation''' into alias '''Y''' using '''loadFunc''' as '''schema'''.
  
  params are extra parameters required for native mapreduce job.
  
- '''mymr.jar is any mapreduce jar file which can be run through "hadoop -jar mymr.jar params" command.'''
+ mymr.jar is any mapreduce jar file which can be run through '''"hadoop -jar mymr.jar params"''' command. Thus, the contract for ''inputLocation'' and ''outputLocation'' is typically managed through ''params''. 
  
  For Example, to run wordcount mapreduce program from Pig, we write
  {{{
  A = load 'WordcountInput.txt';
- B = MAPREDUCE wordcount.jar Store A into 'inputDir' Load 'outputDir' as (word:chararray, count: int) org.myorg.WordCount inputDir outputDir;
+ B = MAPREDUCE wordcount.jar Store A into 'inputDir' Load 'outputDir' as (word:chararray, count: int) `org.myorg.WordCount inputDir outputDir`;
  }}}
  
  == Comparison with similar features ==
@@ -45, +46 @@

  With native job support, pig can support native map reduce jobs written in java language that can convert a data set into a different data set after applying a custom map reduce functions of any complexity.
  
  == Implementation Details ==
+ 
  {{{
  X = ... ;
- Y = MAPREDUCE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
+ Y = MAPREDUCE 'mymr.jar' [('other.jar', ...)] STORE X INTO 'inputLocation' USING storeFunc LOAD 'outputLocation' USING loadFunc AS schema [`params, ... `];
  }}}
- Logical Plan- Logical Plan creates a LONative operator with an internal plan that consists of a store and a load operator. The store operator cannot be attached to X at this level as it would start storing X at storeLocation for every plan that includes X which is not intended. Although we can LOLoad operator for Y at this point, we delay this to physical plan and track this with LONative operator. Also, since Y has dependency on X, we add plan of Y whenever we see plan for X in ''registerQuery''.
  
- Physical Plan- Physical Plan adds the internal store to the physical plan and connects it to X and also adds the load to the plan with alias Y. Also, it creates a dependency between map reduce job for X and native map reduce job, and also between native map reduce job and plan having Y (which is a POLoad operator). We also create a MapReduceOper (customized) for the native map reduce job.
+ === Pig Plans ===
+ Logical Plan- Logical Plan creates a LONative operator with an internal plan that consists of a store and a load operator. The store operator cannot be attached to X at this level as it would start storing X at inputLocation for every plan that includes X, which is not intended. Although we can LOLoad operator for Y at this point, we delay this to physical plan and track this with LONative operator. Since Y has dataflow dependency on X, we make a connection between operators corresponding to these aliased at logical plan.
  
- MapReduce Plan- Inside the JobControlCompiler's compile method if we find the native mapreduce operator we can create a thread and run the Main method of native map reduce job with the specified parameters. Alternatively, we can call into native map reduce job's getJobConf method to get the job conf for the native job, then we can add pig specific parameters to this job and then add the job inside pig's jobcontrol.
+ {{{
+     X = ... ;
+         |
+         |
+         |                            |--- (LOStore) Store X into 'inputLocation'
+     Y = MapReduce ... ;              |
+       (LONative)   --  innnerPlan ---|
+         mymr.jar                     |
+         params                       |--- (LOLoad) Load 'outputLocation'
+         |
+         |
+         ...
+ }}}  
+ TypeCastInserter-
  
- == Questions ==
-  1. Do we need a Custom LoadFunc/StoreFunc for this?
-  2. What is the level of customization we need to support?
+ Physical Plan- Logical plan is visited to convert internal plan of load store combination into corresponding physical plan operators and connections are maintained as per the logical plan.
+ {{{
+     X = ... ;
+         |
+         |
+         |                            |--- (POStore) Store X into 'inputLocation'
+     Y = MapReduce ... ;              |
+       (PONative)   --  innnerPlan ---|
+         mymr.jar                     |
+         params                       |--- (POLoad) Load 'outputLocation'
+         |
+         |
+         ...
+ }}} 
+ 
+ MapReduce Plan- While compiling the mapreduce plan, with MRCompiler, we introduce 
+ {{{
+     X = ... ;
+         |
+         |
+         |                            |--- (POStore) Store X into 'inputLocation'
+     Y = MapReduce ... ;              |
+       (PONative)   --  innnerPlan ---|
+         mymr.jar                     |
+         params                       |--- (POLoad) Load 'outputLocation'
+         |
+         |
+         ...
+ }}}
+ Inside the JobControlCompiler's compile method if we find the native mapreduce operator we run the org.apache.hadoop.util.RunJar's Main method with the specified parameters.
+ 
+ === Security Manager ===
+ hadoop jar command is equivalent to invoking org.apache.hadoop.util.RunJar's main function with required arguments. RunJar internally can invoke several levels of driver classes before executing the hadoop job (for example- hadoop-example.jar). With the 
+ 
+ === Pig Stats ===
  
  == References ==
   1. <<Anchor(ref1)>> PIG-506, "Does pig need a NATIVE keyword?", https://issues.apache.org/jira/browse/PIG-506