You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2008/05/05 16:06:31 UTC

[Pig Wiki] Update of "NestedLogicalPlan" by PiSong

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by PiSong:
http://wiki.apache.org/pig/NestedLogicalPlan

New page:
= Nested Logical Plan Model =
{!!!First draft!!!}

== Benefits ==
 * Support n level nested queries. 
 * Almost all operators are supported in all levels.
 * Allow the language to be more flexible 
 * Consistent and logical design. Easier to implement additional features in the future. 

== What is Load/Store? ==
==== In current implementation ====
 * Load = Load data from file(s) into bag in Pig processing space
 * Store = Write bag from Pig processing space to file(s)

==== In inner plan ====
 * LoadTuple = Load a tuple from a bag from outer plan to inner plan
 * StoreTuple = Store an output tuple from inner plan to the bag in the outer plan

==== Abstraction ====
Load/Store bridges data from one space to another space (and from one type to another type in most cases). 

By above abstraction, it is possible to define different Load/Store :-

||  || From Type || To Type ||
|| Legacy Load || file || bag ||
|| Legacy Store || bag || file ||
|| LoadTuple || bag || tuple ||
|| StoreTuple || Tuple || Bag ||
|| LoadAtom || Tuple || Atom ||
|| StoreAtom || Atom || Tuple ||

 1. This makes it possible to create a consistent nested processing model.
 1. This gives more flexibility to the language beyond plans only consisting of bag-based operators. For example, if I have implemented a UDF called TupleMinMax of type Tuple -> Tuple that does find MIN/MAX across all the elements. One way to try this out in Grunt would be:-
{{{
A = <1,2,3,4,5,6,7,8,9,10> ;
B = TupleMinMax(A) ;
dump B ;

Executing the query..........
B = <1,10>
}}}

The generated plan may look like this:-

{{{
Constant(<1,2,3,4,5,6,7,8,9,10>)
              |
              |
       UDF(TupleMinMax())
}}}
Now I want to do real data processing using TupleMinMax. I may do like this:-

{{{
X = LOAD '/tmp/data1.txt' ;
Y = FILTER X BY ELEM($_) > 0 ;  
Z = FOREACH X  {
                S = TupleMinMax(X) ;
                GENERATE S.$0 * S.$1, S.$0 + 2 ;  
               } ;
STORE Z INTO '/tmp/output' ;
}}}
NOTE: $0, $1, $2, ... only refer to elements in tuple so I imagine I could use $_ for the whole tuple

The top level plan may look like this:-
{{{
                  Load('/tmp/data1.txt')
                          |
                          |
                        FILTER
                          |
                          |
                        FOREACH 
                          |
                          |
                   Store('/tmp/output')
}}}              
The inner plan in FILTER may look like this :-
{{{
                       LoadTuple
                           |
                           |
                         ELEM (Count the number of elements in tuple)
                           |
                           |
                      GreaterThan 0
                           |
                           |
                       StoreAtom (Will be boolean)
}}}
NOTE: This inner plan applies to each tuple in the input bag. The FILTER operator does iterate through the bag, giving out each tuple to the inner plan, and takes output of the inner plan which is boolean atom in this example. The data atom then is used by FILTER to determine whether to forward this tuple to the output port.  

The inner plan in FOREACH may look like this:-
{{{
                       LoadTuple
                           |
                           |
                     UDF(TupleMinMax())
                       /         \ 
                      /           \
             AtomProject(0)   AtomProject(1)
                  /    |          |
     Const(2)--PLUS     \        /
                 |       \      / 
                 |        MULTIPLY
                 |          |
                 |          |
            InsertAtom(1)  InsertAtom(0)
                 \          /
                  \        /
                  StoreTuple

}}}
NOTE: In the real implementation, separating inner plan for each output field might be simpler to do. For example "GENERATE $1+$2, ($1+$2)*5" can be a plan for $1+$2 and a plan for ($1+$2)*5 so that we don't have to care about merging them all. /!\ Open question /!\

==== More examples ====

Given GENERATE: Tuple -> Tuple
and we would like to generate { $1, ($2+5) * $1 } from { $1, $2, $3, $4 }

The plan may look like this:-

{{{
            LoadTuple
            /        \
AtomProject(1)       AtomProject(2)                   
   |\                      |                    
   | \                     |  --------Constant(5)
   |  \                    | /
   |   \                  ADD
   |    \                  |
   |     \                 |
   |      ----------------\|
   |                      MUL 
   |                       |
InsertAtom(1)           InsertAtom(2)
     \                    /
      \ _____        ____/
             \      /
            StoreTuple
}}}
Diagram B1


This looks similar to a common relational plan:-

{{{
 Load('file1')         Load('file2')
   |                       |
  Filter                 Foreach
   |                       |
   |                      /
   |   -------------------
   |  /                     
  Cogroup
   |
  Store('outputfile')
}}}
Diagram B2

This allows us to fully utilize the notion of inner plans as we now can define our processing model using recursive definition.

>>From diagram B2 and Table C, Foreach is an operator that accepts bag and outputs bag. The internal mechanism is to iterate though all the tuples in the input bag. For each tuple, it performs data processing as defined in its inner plan. A plan that does accept tuple and outputs tuple like B1 can be embedded in the FOREACH operator. 

== Dynamic Execution Engine Selection ==
This might not be relevant :)

==== Sample use case ====
We've got log files stored in different physical locations where the directory structure indicates time dimension. We want to be able to process them all at once. 

Here is the directory structure:-

 * /data/01_2008		-- All the log files in Jan
 * /data/02_2008		-- All the log files in Feb
 * /data/03_2008		-- All the log files in Mar

We may extend the LOAD statement to be able to do something like this:-

{{{
X = LOAD '/data/01_2008', '/data/02_2008', '/data/03_2008' 
    AS { <'01_2008', $0 >, <'02_2008', $1 >, <'03_2008', $2 > } ;    /* Alias matching here will be a bit tricky. */
}}}

As can be seen here,  $0, $1, $2 are bags embedded in tuples in a bigger bag .

One may write :-

{{{
Y = ForEach X   
         {
            A = FILTER $1 BY $1.$0 > 100 ;
            B = DISTINCT $1 ; 
	    GENERATE $0, FLATTEN B ;
         } ; 
}}}

In terms of execution plan model, Pig currently supports nested query like above. But due to the fact that the execution engine selection is static (Output plan ==> MapReduce, Inner plan ==> Local Engine), the execution of the above query is unlikely to finish (Given that each input file is huge).

To make the processing engine more flexible, the decision whether to use distributed backend or Local engine to process a plan should be dynamic. An easy way to do this might be to define the processing size threshold:-
{{{
ProcessingSize > K        Use distributed engine
Otherwise                 Use local engine
}}}
One naive way to measure the ProcessingSize is to use the input data size.

== Current Logical Operators as Higher-Order Functions ==

Table C

{{{
FILTER : Bag x (f: Tuple -> Boolean ) -> Bag

COGroup :  [ Bag, f: Tuple -> Tuple ]^n -> Bag

FOREACH  :  Bag x (f: Tuple -> Tuple) -> Bag

GENERATE :  Tuple x (list of flatten flags) -> Tuple
Note: If we just forget the flatten flags of GENERATE, it can be reduced to "GENERATE: Tuple -> Tuple"

PROJECT : Tuple x (list of indices) -> Tuple

CROSS : Bag x Bag -> Bag

JOIN : This can be constructed by COGroup

ORDER : Bag x (f: Tuple x Tuple -> CompareResult) -> Bag

DISTINCT: Bag x (f: Tuple x Tuple -> CompareResult) -> Bag

UNION: Bag x Bag -> Bag

SPLIT: [ Bag, f: Tuple -> Boolean ]^n -> Bag^n
}}}

== Problems with current Operators (5-May-2008) ==

==== LOGenerate ====

- In top level query, LOGenerate is just a splitter in the syntax (And possibly makes it easy to read). The main operator that has functionaly is  LOForEach.
{{{
X = FOREACH Y GENERATE $0, $1 ;
}}}
- In nested query, GENERATE is just a marker before the list of expected output. Again, all the processing will go to the LOForEach.
{{{
Y = ForEach X   
         {
            A = DISTINCT $1 ; 
	    GENERATE $0, FLATTEN B ;
         } ;
}}}
is equivalent to
{{{
Y = ForEach X GENERATE $0, FLATTEN(DISTINCT($1)) ;
}}}
Seems like LOGenerate is not needed at all. GENERATE is more like just a part of FOREACH syntax (analogous to BY and FILTER)

==== LOProject ====
This operator is only for mapping input tuple to output tuple (eg. {A,B,C,D,E} ==> {A,C,D} ). Given the fact that we allow users to have fields in COGROUP, FILTER, FOREACH as expressions, LOProject then becomes just a special case when users merely specify direct mapping. Since we have agreed upon the concept of inner plans, I think LOProject is not needed.