You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2015/05/05 08:38:06 UTC

[jira] [Commented] (PIG-4522) Remove unnecessary store and load when POSplit is encounted

    [ https://issues.apache.org/jira/browse/PIG-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14528019#comment-14528019 ] 

liyunzhang_intel commented on PIG-4522:
---------------------------------------

If enable multiquery and set “opt.multiquery=true” in conf/pig.properties.
testSplit.pig: 
{code}
A = load './testSplit.txt' as (f1:int, f2:int,f3:int);
split A into x if f1<7, y if f2==5, z if (f3<6 or f3>6);
store x into './testSplit_x.out';
store y into './testSplit_y.out';
store z into './testSplit_z.out';
{code}

run it in mapreduce mode:
./pig –x mapred testSplit.pig
the MRPlan will be:
{code}
There are 4 mapreduce nodes(scope-31, scope-34,scope-36, scope-38).
#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-31
Map Plan
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage) - scope-32
|
|---A: New For Each(false,false,false)[bag] - scope-10
    |   |
    |   Cast[int] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[int] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |   |
    |   Cast[int] - scope-8
    |   |
    |   |---Project[bytearray][2] - scope-7
    |
    |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage) - scope-0--------
Global sort: false
----------------

MapReduce node scope-34
Map Plan
x: Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_x.out:org.apache.pig.builtin.PigStorage) - scope-16
|
|---x: Filter[bag] - scope-12
    |   |
    |   Less Than[boolean] - scope-15
    |   |
    |   |---Project[int][0] - scope-13
    |   |
    |   |---Constant(7) - scope-14
    |
    |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage) - scope-33--------
Global sort: false
----------------

MapReduce node scope-36
Map Plan
y: Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_y.out:org.apache.pig.builtin.PigStorage) - scope-21
|
|---y: Filter[bag] - scope-17
    |   |
    |   Equal To[boolean] - scope-20
    |   |
    |   |---Project[int][1] - scope-18
    |   |
    |   |---Constant(5) - scope-19
    |
    |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage) - scope-35--------
Global sort: false
----------------

MapReduce node scope-38
Map Plan
z: Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_z.out:org.apache.pig.builtin.PigStorage) - scope-30
|
|---z: Filter[bag] - scope-22
    |   |
    |   Or[boolean] - scope-29
    |   |
    |   |---Less Than[boolean] - scope-25
    |   |   |
    |   |   |---Project[int][2] - scope-23
    |   |   |
    |   |   |---Constant(6) - scope-24
    |   |
    |   |---Greater Than[boolean] - scope-28
    |       |
    |       |---Project[int][2] - scope-26
    |       |
    |       |---Constant(6) - scope-27
    |
    |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage) - scope-37--------
Global sort: false
----------------
{code}

After executing  [org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer#visit|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java#L696]
The mrplan will be:   there is only 1 mapreduce node.
{code}
Split - scope-39
|   |
|   x: Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_x.out:org.apache.pig.builtin.PigStorage) - scope-16
|   |
|   |---x: Filter[bag] - scope-12
|       |   |
|       |   Less Than[boolean] - scope-15
|       |   |
|       |   |---Project[int][0] - scope-13
|       |   |
|       |   |---Constant(7) - scope-14
|   |
|   y: Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_y.out:org.apache.pig.builtin.PigStorage) - scope-21
|   |
|   |---y: Filter[bag] - scope-17
|       |   |
|       |   Equal To[boolean] - scope-20
|       |   |
|       |   |---Project[int][1] - scope-18
|       |   |
|       |   |---Constant(5) - scope-19
|   |
|   z: Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_z.out:org.apache.pig.builtin.PigStorage) - scope-30
|   |
|   |---z: Filter[bag] - scope-22
|       |   |
|       |   Or[boolean] - scope-29
|       |   |
|       |   |---Less Than[boolean] - scope-25
|       |   |   |
|       |   |   |---Project[int][2] - scope-23
|       |   |   |
|       |   |   |---Constant(6) - scope-24
|       |   |
|       |   |---Greater Than[boolean] - scope-28
|       |       |
|       |       |---Project[int][2] - scope-26
|       |       |
|       |       |---Constant(6) - scope-27
|
|---A: New For Each(false,false,false)[bag] - scope-10
    |   |
    |   Cast[int] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[int] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |   |
    |   Cast[int] - scope-8
   |   |
    |   |---Project[bytearray][2] - scope-7
    |
|---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage) - scope-0
{code}

We can also add multiQueryOptimizer in spark. multiQueryOptimizer in spark. In my view, If the sparkplan is optimized like what mr does. There is  no need to use RDD.cache() to reduce the repetitive LOADs and a new SparkOperator is needed when POSplit is encounted.

But I found in some cases, it seems  that a new SparkOperator should not be created when POSplit is encounted:
*testAccumulator.join.sh*
{code}
A = load './testAccumulator.txt' as (id:int,f);
B = foreach A generate id, f, id as t;
C = group B by id;
D = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
G = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
H = join D by group, G by group; 
store H into 'testAccumulator.join.out';
explain H
{code}

*Physical Plan:*
An implicit POSplit is generated in physical plan.
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
    |   |
    |   Project[bag][1] - scope-54
    |   |
    |   Project[bag][2] - scope-55
    |
    |---H: Package(Packager)[tuple]{int} - scope-49
        |
        |---H: Global Rearrange[tuple] - scope-48
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-50
            |   |   |
            |   |   Project[int][0] - scope-51
            |   |
            |   |---D: New For Each(false,false)[bag] - scope-32
            |       |   |
            |       |   Project[int][0] - scope-22
            |       |   |
            |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
            |       |   |
            |       |   |---RelationToExpressionProject[bag][*] - scope-24
            |       |       |
            |       |       |---F: New For Each(false)[bag] - scope-31
            |       |           |   |
            |       |           |   Project[bytearray][1] - scope-29
            |       |           |
            |       |           |---E: POSort[bag]() - scope-28
            |       |               |   |
            |       |               |   Project[bytearray][1] - scope-27
            |       |               |
            |       |               |---Project[bag][1] - scope-26
            |       |
            |       |---C: Filter[bag] - scope-20
            |           |   |
            |           |   Constant(true) - scope-21
            |           |
            |           |---C: Split - scope-19   // here an implicit Split is generated
            |               |
            |               |---C: Package(Packager)[tuple]{int} - scope-16
            |                   |
            |                   |---C: Global Rearrange[tuple] - scope-15
            |                       |
            |                       |---C: Local Rearrange[tuple]{int}(false) - scope-17
            |                           |   |
            |                           |   Project[int][0] - scope-18
            |                           |
            |                           |---B: New For Each(false,false,false)[bag] - scope-14
            |                               |   |
            |                               |   Project[int][0] - scope-7
            |                               |   |
            |                               |   Project[bytearray][1] - scope-9
            |                               |   |
            |                               |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
            |                               |   |
            |                               |   |---Project[int][0] - scope-11
            |                               |
            |                               |---A: New For Each(false,false)[bag] - scope-6
            |                                   |   |
            |                                   |   Cast[int] - scope-2
            |                                   |   |
            |                                   |   |---Project[bytearray][0] - scope-1
            |                                   |   |
            |                                   |   Project[bytearray][1] - scope-4
            |                                   |
            |                                   |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage) - scope-0
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-52
                |   |
                |   Project[int][0] - scope-53
                |
                |---G: New For Each(false,false)[bag] - scope-45
                    |   |
                    |   Project[int][0] - scope-35
                    |   |
                    |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
                    |   |
                    |   |---RelationToExpressionProject[bag][*] - scope-37
                    |       |
                    |       |---F: New For Each(false)[bag] - scope-44
                    |           |   |
                    |           |   Project[bytearray][1] - scope-42
                    |           |
                    |           |---E: POSort[bag]() - scope-41
                    |               |   |
                    |               |   Project[bytearray][1] - scope-40
                    |               |
                    |               |---Project[bag][1] - scope-39
                    |
                    |---C: Filter[bag] - scope-33
                        |   |
                        |   Constant(true) - scope-34
                        |
                        |---C: Split - scope-19
                            |
                            |---C: Package(Packager)[tuple]{int} - scope-16
                                |
                                |---C: Global Rearrange[tuple] - scope-15
                                    |
                                    |---C: Local Rearrange[tuple]{int}(false) - scope-17
                                        |   |
                                        |   Project[int][0] - scope-18
                                        |
                                        |---B: New For Each(false,false,false)[bag] - scope-14
                                            |   |
                                            |   Project[int][0] - scope-7
                                            |   |
                                            |   Project[bytearray][1] - scope-9
                                            |   |
                                            |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
                                            |   |
                                            |   |---Project[int][0] - scope-11
                                            |
                                            |---A: New For Each(false,false)[bag] - scope-6
                                                |   |
                                                |   Cast[int] - scope-2
                                                |   |
                                                |   |---Project[bytearray][0] - scope-1
                                                |   |
                                                |   Project[bytearray][1] - scope-4
                                                |
                                                |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage) - scope-0

{code}

*Spark Plan*
   POSort(scope-28) should be deleted in [SecondaryKeyOptimizerUtil.java#applySecondaryKeySort|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java#L329]. but it is not deleted in this situation because SecondaryKeyOptimizer will only be enabled when group+foreach case is found in one SparkOperator while  POSplit(scope-19 in physical plan ) makes group and foreach in different operators.  if you run testAccumulator.join.sh in mr mode, you will find POSort is not deleted even secondary key optimization is enabled for the same reason.

{code}
Spark node scope-58
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage) - scope-59
|
|---C: Package(Packager)[tuple]{int} - scope-16
    |
    |---Global Rearrange[tuple] - scope-15
        |
        |---C: Local Rearrange[tuple]{int}(false) - scope-17
            |   |
            |   Project[int][0] - scope-18
            |
            |---B: New For Each(false,false,false)[bag] - scope-14
                |   |
                |   Project[int][0] - scope-7
                |   |
                |   Project[bytearray][1] - scope-9
                |   |
                |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
                |   |
                |   |---Project[int][0] - scope-11
                |
                |---A: New For Each(false,false)[bag] - scope-6
                    |   |
                    |   Cast[int] - scope-2
                    |   |
                    |   |---Project[bytearray][0] - scope-1
                    |   |
                    |   Project[bytearray][1] - scope-4
                    |
                    |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage) - scope-0--------
 
Spark node scope-64
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
    |   |
    |   Project[bag][1] - scope-54
    |   |
    |   Project[bag][2] - scope-55
    |
    |---H: Package(Packager)[tuple]{int} - scope-49
        |
        |---Global Rearrange[tuple] - scope-48
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-50
            |   |   |
            |   |   Project[int][0] - scope-51
            |   |
            |   |---D: New For Each(false,false)[bag] - scope-32
            |       |   |
            |       |   Project[int][0] - scope-22
            |       |   |
            |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
            |       |   |
            |       |   |---RelationToExpressionProject[bag][*] - scope-24
            |       |       |
            |       |       |---F: New For Each(false)[bag] - scope-31
            |       |           |   |
            |       |           |   Project[bytearray][1] - scope-29
            |       |           |
            |       |           |---E: POSort[bag]() - scope-28    
            |       |               |   |                                           
            |       |               |   Project[bytearray][1] - scope-27
            |       |               |
            |       |               |---Project[bag][1] - scope-26
            |       |
            |       |---C: Filter[bag] - scope-20
            |           |   |
            |           |   Constant(true) - scope-21
            |           |
            |           |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage) - scope-60
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-52
                |   |
                |   Project[int][0] - scope-53
                |
                |---G: New For Each(false,false)[bag] - scope-45
                    |   |
                    |   Project[int][0] - scope-35
                    |   |
                    |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
                    |   |
                    |   |---RelationToExpressionProject[bag][*] - scope-37
                    |       |
                    |       |---F: New For Each(false)[bag] - scope-44
                    |           |   |
                    |           |   Project[bytearray][1] - scope-42
                    |           |
                    |           |---E: POSort[bag]() - scope-41
                    |               |   |
                    |               |   Project[bytearray][1] - scope-40
                    |               |
                    |               |---Project[bag][1] - scope-39
                    |
                    |---C: Filter[bag] - scope-33
                        |   |
                        |   Constant(true) - scope-34
                        |
                        |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage) - scope-62--------
{code}

> Remove unnecessary store and load when POSplit is encounted
> -----------------------------------------------------------
>
>                 Key: PIG-4522
>                 URL: https://issues.apache.org/jira/browse/PIG-4522
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4522.patch
>
>
> pig script:
> {code}
> A = load './testSplit.txt' as (f1:int, f2:int,f3:int);
> split A into x if f1<7, y if f2==5, z if (f3<6 or f3>6);
> store x into './testSplit_x.out';
> store y into './testSplit_y.out';
> store z into './testSplit_z.out';
> explain x; 
> explain y;
> explain z;
> {code}
> spark plan:
> {code}
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-17->scope-20 
> scope-20
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-17
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-1477385839:org.apache.pig.impl.io.InterStorage) - scope-18
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
>     |   |
>     |   Cast[int] - scope-2
>     |   |
>     |   |---Project[bytearray][0] - scope-1
>     |   |
>     |   Cast[int] - scope-5
>     |   |
>     |   |---Project[bytearray][1] - scope-4
>     |   |
>     |   Cast[int] - scope-8
>     |   |
>     |   |---Project[bytearray][2] - scope-7
>     |
>     |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage) - scope-0--------
> Spark node scope-20
> x: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-16
> |
> |---x: Filter[bag] - scope-12
>     |   |
>     |   Less Than[boolean] - scope-15
>     |   |
>     |   |---Project[int][0] - scope-13
>     |   |
>     |   |---Constant(7) - scope-14
>     |
>     |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-1477385839:org.apache.pig.impl.io.InterStorage) - scope-19--------
> #-----------------------------------------------------#
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-38->scope-41 
> scope-41
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-38
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-918933337:org.apache.pig.impl.io.InterStorage) - scope-39
> |
> |---A: New For Each(false,false,false)[bag] - scope-31
>     |   |
>     |   Cast[int] - scope-23
>     |   |
>     |   |---Project[bytearray][0] - scope-22
>     |   |
>     |   Cast[int] - scope-26
>     |   |
>     |   |---Project[bytearray][1] - scope-25
>     |   |
>     |   Cast[int] - scope-29
>     |   |
>     |   |---Project[bytearray][2] - scope-28
>     |
>     |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage) - scope-21--------
> Spark node scope-41
> y: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-37
> |
> |---y: Filter[bag] - scope-33
>     |   |
>     |   Equal To[boolean] - scope-36
>     |   |
>     |   |---Project[int][1] - scope-34
>     |   |
>     |   |---Constant(5) - scope-35
>     |
>     |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-918933337:org.apache.pig.impl.io.InterStorage) - scope-40--------
> #-----------------------------------------------------#
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-63->scope-66 
> scope-66
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-63
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp1444529161:org.apache.pig.impl.io.InterStorage) - scope-64
> |
> |---A: New For Each(false,false,false)[bag] - scope-52
>     |   |
>     |   Cast[int] - scope-44
>     |   |
>     |   |---Project[bytearray][0] - scope-43
>     |   |
>     |   Cast[int] - scope-47
>     |   |
>     |   |---Project[bytearray][1] - scope-46
>     |   |
>     |   Cast[int] - scope-50
>     |   |
>     |   |---Project[bytearray][2] - scope-49
>     |
>     |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage) - scope-42--------
> Spark node scope-66
> z: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-62
> |
> |---z: Filter[bag] - scope-54
>     |   |
>     |   Or[boolean] - scope-61
>     |   |
>     |   |---Less Than[boolean] - scope-57
>     |   |   |
>     |   |   |---Project[int][2] - scope-55
>     |   |   |
>     |   |   |---Constant(6) - scope-56
>     |   |
>     |   |---Greater Than[boolean] - scope-60
>     |       |
>     |       |---Project[int][2] - scope-58
>     |       |
>     |       |---Constant(6) - scope-59
>     |
>     |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp1444529161:org.apache.pig.impl.io.InterStorage) - scope-65--------
> {code}
> Scope-18(Store) and Scope-19(Load)  is not necessary. It should be removed.  
> Scope-39(Store) and Scope-40(Load)  is not necessary. It should be removed.  
> Scope-64(Store) and Scope-65(Load) is not necessary. It should be removed.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)