You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2008/01/22 07:40:57 UTC

[Pig Wiki] Update of "PigExecutionModel" by UtkarshSrivastava

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by UtkarshSrivastava:
http://wiki.apache.org/pig/PigExecutionModel

------------------------------------------------------------------------------
  = Introduction =
  
- The goal is to decide how to structure operator evaluation pipelines in Pig. The major issues include whether data flow follows a push model or a pull model, whether an operator evaluation tree is multi-threaded or single-threaded, and what the API for user-defined functions (UDFs) looks like.
+ __Task__: Redesign Pig logical plan structure and execution engine
  
+ __Goals__:
+  * Address current inefficiencies.
+  * Open up new opportunities for optimization.
- The new model needs to support arbitrary operator DAGs (which may arise in a single pig program, or when jointly executing groups of interrelated programs).
+  * Support arbitrary operator DAGs (which may arise in a single pig program, or when jointly executing groups of interrelated programs).
  
- = Alternative Execution Models =
  
- Some possible execution models are:
+ == Logical Plan Structure ==
  
- == Model 1 ==
+ '''Current Problems''':
+  * We use operators to manipulate the outermost dataset, but eval specs to manipulate the nested data, which reduces code reuse and increases complexity.
+  * Eval specs are assumed to be a linear chain. Thus it makes doing splits and general DAGs difficult.
  
-    * One thread.
-    * Top-level scheduler round-robins through "leaf" operators.
-    * Each time an operator is invoked, it gets handed exactly one tuple.
-    * Special "split" operator buffers data that gets fed to multiple operators; at most one tuple gets buffered at each split point.
-    * UDF API: declare zero or one input bag as "streaming"; init() hands it all data except the streaming bag; next() hands one tuple from streaming bag
+ '''Proposal''':
+  1. Get rid of eval specs, make everything operators
+  1. Since Pig deals with nested data models and allows manipulation of nested data, it is only natural for the logical plan structure to be fully nestable, e.g. the `foreach` operator can have a nested query plan that it uses to process each input tuple.
+  1. Execute the outermost operators using their map-reduce implementations if any (see table below). Execute all nested query plans using local execution mode.
+  1. Add a split operator that replicates input data along multiple outgoing branches. This will help us to support multiple outputs and DAGs.
  
- == Model 2 ==
  
+ '''Advantages of a nested query plan''':
+     * Same operator set used for processing both the outermost, as well as nested data; no code duplication, easier to understand.
+     * Can reuse the local execution mode to process the nested query plans
+     * Can allow for generalization of Pig Latin in the future where the language within `FOREACH` can be the same as that outside it.
-    * One thread per "leaf" operator.
-    * Scheduling done by OS.
-    * Operator gets to read as many tuples as it wants; if it reads from multiple inputs, can interleave "next()" calls on the inputs in arbitrary fashion.
-    * Split operator may buffer up to K tuples (or B bytes); if an operator tries to read too far ahead it gets blocked until other operators reading from the same buffer catch up.
-    * Deadlock can arise; need to detect it and release it by relaxing the K/B constraint on one or more of the split buffers.
  
- == Discussion ==
  
- === Model 1 Drawbacks ===
  
+ Here is a list of proposed operators:
-    * underutilize multi-core systems? (depends whether the policy is to assign several map/reduce tasks to a machine)
-    * difficult (or impossible?) to support operations that require streaming access to multiple inputs (e.g., merge join, merge-based set difference, etc. which operate over pre-sorted input streams)
-    * UDF APIs more complex?
  
- === Model 2 Drawbacks ===
+ || '''Operator''' || '''Attributes''' || '''Number of inputs in query plan''' || '''Semantics''' || '''Implementation (M-R vs local)''' ||
+ || LOAD || file names, load function || 0 || Loads the contents of the files using the given load function into a bag || Same (with file system abstraction layer ||
+ || STORE || file name, store function, in future a hierarchy of fields to partition by || 1 || Stores the input bag into the given file (in future, partitioned by the given field hierarchy) || Same (with file system abstraction layer) ||
+ || FOREACH || nested query plan || 1 || Applies the nested query plan to each item of the input bag to produce an output bag || Same ||
+ || GENERATE || None ||>=1 || Computes the output(s) of its child query plan(s), and concatenates them together into output tuple(s). This will typically be the topmost operator in the nested query plan within the FOREACH operator|| Same ||
+ || FILTER || nested query plan for condition || 1 || Applies the nested query plan to each item of the input bag. If the plan returns true, the item is put in the output bag, otherwise not. || Same ||
+ || GROUP/COGROUP || nested query plans, one for the grouping criteria of each input || >=1 || Applies the appropriate nested query plan to each item of each input to determine its group. Items within the same group are grouped together into bags || Different: M-R will use map-reduce, Local will use our new Sorted``Bag to sort and collect data into bags ||
+ || SORT || list of columns on which to sort, ASC, DESC flags for each column || 1 || Orders the input bag to produce an output bag || Different: M-R will use quantiles and map-reduce to do a parallel sort, local will use Sorted``Bag. ||
+ || DISTINCT (Blocking) || None || 1 || Eliminates duplicates in the input bag to produce an output bag || Different: M-R will rewrite into group/foreach, local will use Distinct``Bag. ||
+ || PROJECT || list of column numbers or schema names || 1 || Selects the specified columns of the input tuple into the output tuple || Same ||
+ || MAP_LOOKUP || a list of keys to lookup || 1 || Selects the contents of the specified keys into an output tuple || Same ||
+ || BINCOND || 3 nested query plans: 1 specifying condition, 1 specifying what to ouptut when condition is true, and 1 specifying what to output when condition is false || 1 || Same as conditional expression in C++/Java (a>b?c:d) || Same ||
+ || COMPARISON (<, <=, >, >=, matches, ==, !=) || None || >=1 || Computes the output(s) of its child query plan(s), and compares them according to the specified logical operator, outputs a boolean || Same ||
+ || AND/OR/NOT || None || >=1 || Computes the (boolean) output(s) of its child query plan(s), and combines them according to the specified logical operator, outputs a boolean || Same ||
+ || CONSTANT || constant value || 0 || Outputs a constant value || Same ||
+ || UDF_APPLY || UDF to apply || >=1 || Computes the output(s) of its child query plans, assembles the results together into a tuple, and applies the UDF using that tuple as the argument. The result is passed on as output. || Same ||
+ || STREAM || program to invoke || >=1 (may have multiple outputs as well) || streams the input to the external program without waiting for output. The output arrives at some later point in time || Same ||
+ || SPLIT || None || 1 (only operator to have multiple outputs apart from STREAM) || replicates input along both output branches || different (depends on our choice of push vs pull model). If pull, M-R buffers to DFS, local buffers in memory (spilling to disk if necessary). ||
+ || UNION || none || >=1 || union of child query plans || Different (Map-side union will be no-op, reduce side will cause break in pipeline) In local mode, straightforward. ||
  
-    * thread synchronization overhead
-    * complexity of multi-threaded implementation
+ == Plan Execution ==
+ 
+ Each of the above logical operators will translate to a physical operator (in many cases, the physical operator will be shared between backends, as shown in the above table).
+ 
+ One physical operators have been linked together into a query plan, they must be executed. There is a choice of mainly 2 models for execution (assume that data flows downwards in an execution plan):
+ 
+  1. '''Push''': Operator A pushes data to B that operates on it, and pushes the result  to C.
+  2. '''Pull''': Operator C asks B for its next data item. If B has nothing pending to return, it asks A. When A returns a data item, B operates on it, and finally returns the result to C.
+ 
+ 
+ '''Pull API'''
+ {{{
+ public interface Pull<T> {
+     /*
+     * T is the return type. In the most general case
+     * it can be Object, so we might omit T altogether.
+     */
+ 
+     public void open();
+ 
+     public T getNext();
+ 
+     public void close();
+ }
+ }}}
+ 
+ '''Push API''':
+ {{{
+ public interface Push<T> {
+     /*
+     * T is the output type. In the most general case
+     * it can be Object, so we might omit T altogether.
+     */
+ 
+     public void open();
+ 
+     public putNext(T);
+ 
+     public void close();
+ }
+ }}}
+ 
+ 
+ Each model has its own unique advantages and disadvantages. Pull is more natural when there are multiple inputs, push is natural when there are multiple outputs. In the context of our operators, there are several places where one of the models is a natural fit, and is hard to replace.
+ 
+ '''Pull''' - Natural use in:
+  * UDFs already pull data by using an iterator.
+  * FILTER, GROUP/COGROUP, BINCOND : Evaluate their nested query plans using pull (can be converted to push, though unnatural).
+  * COMPARISON, AND/OR/NOT: Pull data because they have multiple inputs
+ 
+ '''Push''' - Natual use in:
+  * EvalFunc<Bag>: Eval functions push data (through bag.add()). If we go with a pull model, have to hold in memory any bag output by a function (seems reasonable).
+  * SPLIT: Multiple outputs are most naturally supported through the push model.
+ 
+ Disadvantages:
+ 
+ '''Pull''':
+  * Requires buffering whenever there are multiple outputs.
+  * Requires multiple passes over the data if multiple aggregates are to be computed.
+ 
+ '''Push''':
+  * No convenient API to push data to UDFs that have multiple inputs.
+ 
+ === Proposal ===
+ 
+  1. Single-threaded execution.
+  1. No API changes to UDFs in the short term. In the long term, we might introduce an aggregation function API that is push-based along the lines of the [http://www.postgresql.org/docs/8.2/static/sql-createaggregate.html Postgres user-defined aggregation function API.
+  1. Make the model entirely *push-based*. Reasons:
+     1. Accomodates multiple outputs more naturally.
+     1. Accomodates possible future change of having special aggregate function API to iterate over data only once.
+     1. Functions can still pull data; we will push an entire iterator to them, instead of pushing tuple by tuple.
+  1. Implement hadoop iterator cloning/rewind so that we don't have to do our own disk writes.
+  1. Before hadoop iterator cloning becomes available, we could even materialize bags in memory, as today and this model works.
+  1. When multiple iterators become available (either by reduce-side extension of hadoop), or when doing a map-side cogroup, it will fit nicely into this model.
+ 
+ 
+ === Use Cases ===
+ 
+ To be written
+ 
+ 
  
  === Related Reading ===
  
-    * Fjords paper 
+    * Fjords paper
        * paper: http://db.lcs.mit.edu/madden/html/madden_fjords.pdf
        * slides: http://www.cs.umd.edu/class/fall2002/cmsc818s/Lectures/fjords.pdf
-    * Stream Programming Model / MIT Stream-It 
+    * Stream Programming Model / MIT Stream-It
        * official page for stream-it: http://www.cag.csail.mit.edu/streamit/ (Articles on the compiler might be useful)