You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Zhijie Shen (JIRA)" <ji...@apache.org> on 2011/06/16 05:37:47 UTC

[jira] [Commented] (PIG-1916) Nested cross

    [ https://issues.apache.org/jira/browse/PIG-1916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13050201#comment-13050201 ] 

Zhijie Shen commented on PIG-1916:
----------------------------------

Make a report of my progress and the issue I've met.

First, by adding the nested cross syntax (e.g., changing grammar source files and the hooked functions), Pig can now accept the nested cross statement, generate the logic plan and reach the step of translating the logic plan into the physical plan. The major issue here is multiple inputs. The existing nested operators only have one input, hence current grammar seems to assume one input for the nested operators. However, cross can accept multiple inputs. Therefore the input syntax in the nested block has to been changed as well.

How to translate the logic nested cross into the correct physical operators looks like the essence part of this project. I've spent time to investigating into the translation, as well as the map/reduce plan compiling and map/reduce job execution.

During the stage of logic plan generation, both top-level and nested cross statements will result in a LOCross instance. LogToPhyTranslationVisitor already has the function to visit POCross. However, the fuction works with the top-level POCross, but not the nested one. To see why it doesn't work for nested cross, here are two map/reduce plans respectively generated for the top-level and the nested cross operators:

1. top-level:
user = load 'user.txt' as (uid, region);
session = load 'session.txt' as (uid, region, duration);
A = group user by uid;
B = group session by uid;
C = cross A, B;
store C into 'test.out';

#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-38
Map Plan
Union[tuple] - scope-39
|
|---C: Local Rearrange[tuple]{tuple}(false) - scope-21
|   |   |
|   |   Project[int][0] - scope-22
|   |   |
|   |   Project[int][1] - scope-23
|   |
|   |---C: New For Each(true,true)[tuple] - scope-20
|       |   |
|       |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-18
|       |   |
|       |   |---Constant(2) - scope-16
|       |   |
|       |   |---Constant(0) - scope-17
|       |   |
|       |   Project[tuple][*] - scope-19
|       |
|       |---user: New For Each(false,false)[bag] - scope-5
|           |   |
|           |   Project[bytearray][0] - scope-1
|           |   |
|           |   Project[bytearray][1] - scope-3
|           |
|           |---user: Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage) - scope-0
|
|---C: Local Rearrange[tuple]{tuple}(false) - scope-29
    |   |
    |   Project[int][0] - scope-30
    |   |
    |   Project[int][1] - scope-31
    |
    |---C: New For Each(true,true)[tuple] - scope-28
        |   |
        |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
        |   |
        |   |---Constant(2) - scope-24
        |   |
        |   |---Constant(1) - scope-25
        |   |
        |   Project[tuple][*] - scope-27
        |
        |---session: New For Each(false,false,false)[bag] - scope-13
            |   |
            |   Project[bytearray][0] - scope-7
            |   |
            |   Project[bytearray][1] - scope-9
            |   |
            |   Project[bytearray][2] - scope-11
            |
            |---session: Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage) - scope-6--------
Reduce Plan
C: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-35
|
|---POJoinPackage(true,true)[tuple] - scope-40--------
Global sort: false
----------------

2. nested:
user = load 'user.txt' as (uid, region);
session = load 'session.txt' as (uid, region, duration);
C = cogroup user by uid, session by uid;
D = foreach C {
    crossed = cross user, session;
    filtered = filter crossed by user::region == session::region;
    generate group, user, session;
};
store D into 'test.out';

#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-52
Map Plan
Union[tuple] - scope-53
|
|---C: Local Rearrange[tuple]{bytearray}(false) - scope-16
|   |   |
|   |   Project[bytearray][0] - scope-17
|   |
|   |---user: New For Each(false,false)[bag] - scope-5
|       |   |
|       |   Project[bytearray][0] - scope-1
|       |   |
|       |   Project[bytearray][1] - scope-3
|       |
|       |---user: Load(file:///home/zjshen/Workspace/eclipse/pig_test/user.txt:org.apache.pig.builtin.PigStorage) - scope-0
|
|---C: Local Rearrange[tuple]{bytearray}(false) - scope-18
    |   |
    |   Project[bytearray][0] - scope-19
    |
    |---session: New For Each(false,false,false)[bag] - scope-13
        |   |
        |   Project[bytearray][0] - scope-7
        |   |
        |   Project[bytearray][1] - scope-9
        |   |
        |   Project[bytearray][2] - scope-11
        |
        |---session: Load(file:///home/zjshen/Workspace/eclipse/pig_test/session.txt:org.apache.pig.builtin.PigStorage) - scope-6--------
Reduce Plan
D: Store(test.out:org.apache.pig.builtin.PigStorage) - scope-49
|
|---D: New For Each(false)[bag] - scope-48
    |   |
    |   RelationToExpressionProject[bag][*] - scope-20
    |   |
    |   |---filtered: Filter[bag] - scope-44
    |       |   |
    |       |   Equal To[boolean] - scope-47
    |       |   |
    |       |   |---Project[bytearray][1] - scope-45
    |       |   |
    |       |   |---Project[bytearray][3] - scope-46
    |       |
    |       |---crossed: New For Each(true,true)[tuple] - scope-43
    |           |   |
    |           |   Project[bag][1] - scope-41
    |           |   |
    |           |   Project[bag][2] - scope-42
    |           |
    |           |---Package[tuple]{tuple} - scope-24
    |               |
    |               |---crossed: Global Rearrange[tuple] - scope-23
    |                   |
    |                   |---crossed: Local Rearrange[tuple]{tuple}(false) - scope-30
    |                   |   |   |
    |                   |   |   Project[int][0] - scope-31
    |                   |   |   |
    |                   |   |   Project[int][1] - scope-32
    |                   |   |
    |                   |   |---crossed: New For Each(true,true)[tuple] - scope-29
    |                   |       |   |
    |                   |       |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-27
    |                   |       |   |
    |                   |       |   |---Constant(2) - scope-25
    |                   |       |   |
    |                   |       |   |---Constant(0) - scope-26
    |                   |       |   |
    |                   |       |   Project[tuple][*] - scope-28
    |                   |       |
    |                   |       |---Project[bag][1] - scope-21
    |                   |
    |                   |---crossed: Local Rearrange[tuple]{tuple}(false) - scope-38
    |                       |   |
    |                       |   Project[int][0] - scope-39
    |                       |   |
    |                       |   Project[int][1] - scope-40
    |                       |
    |                       |---crossed: New For Each(true,true)[tuple] - scope-37
    |                           |   |
    |                           |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-35
    |                           |   |
    |                           |   |---Constant(2) - scope-33
    |                           |   |
    |                           |   |---Constant(1) - scope-34
    |                           |   |
    |                           |   Project[tuple][*] - scope-36
    |                           |
    |                           |---Project[bag][2] - scope-22
    |
    |---C: Package[tuple]{bytearray} - scope-15--------
Global sort: false
----------------

The difference in the physical plan is not obvious because translation procedure is same: local rearrange first -> global rearrange -> package -> physical foreach. However, when the physical plan is translated into the map/reduce plan, the difference becomes obvious. The physical operators belonging to the top-level cross are distributed in both map and reduce stage: local rearrange is placed in map; global rearrange is removed because the logic is inherently available in map/reduce; package and foreach group together to form POJoinPackage which is placed in the reduce stage.

On the other hand, the physical plan of the nested cross hasn't be translated into the same map/reduce plan. This is because the physical operators of the nested commands cannot be distributed into both map and reduce stages. Instead, they have to be solved locally in one of either map/reduce stage. Therefore, the problem occurs, the global rearrange and the package operators appear inside the reduce stage. First, the global rearrange used to be assumed to be replaced by Hadoop's merge/shuffle, so that the logic of processing data is not implemented. Second, package reads data from Hadoop's reduce function. And in the current implementation, only one package can appear in the reduce stage, because the keyInfo member of only one POPackage instance will be set.

As to these situations, when visiting LOCross in LogToPhyTranslationVisitor, whether it is a top-level or a nested cross should be distinguished (perhaps need modification on logical plan). If it is a nested cross, some other operator (temporarily named POCross) need to be generated to replace POGlobalRearrange and POPackage. It should achieve the similar functionality, but can operate locally. Afterwards, MRCompiler needs a function to visit the POCross instance and attach it to the reduce stage.

Attached is the code I've modified up till now.


> Nested cross
> ------------
>
>                 Key: PIG-1916
>                 URL: https://issues.apache.org/jira/browse/PIG-1916
>             Project: Pig
>          Issue Type: New Feature
>          Components: impl
>            Reporter: Daniel Dai
>              Labels: gsoc2011
>             Fix For: 0.10
>
>
> It is useful to have cross inside foreach nested statement. One typical use case for nested foreach is after cogroup two relations, we want to flatten the records of the same key, and do some processing. This is naturally to be achieved by cross. Eg:
> {code}
> C = cogroup user by uid, session by uid;
> D = foreach C {
>     crossed = cross user, session; -- To flatten two input bags
>     filtered = filter crossed by user::region == session::region;
>     result = foreach crossed generate processSession(user::age, user::gender, session::ip);  --Nested foreach Jira: PIG-1631
>     generate result;
> }
> {code}
> If we don't have cross, user have to write a UDF process the bag user, session. It is much harder than a UDF process flattened tuples. This is especially true when we have nested foreach statement(PIG-1631).
> This is a candidate project for Google summer of code 2011. More information about the program can be found at http://wiki.apache.org/pig/GSoc2011

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira