You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2016/02/23 04:50:18 UTC

[jira] [Updated] (PIG-4771) Implement FR Join for spark engine

     [ https://issues.apache.org/jira/browse/PIG-4771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liyunzhang_intel updated PIG-4771:
----------------------------------
    Attachment: PIG-4771.patch

Use the algorithms in POFRJoin to implement FRJoin for pig on spark mode.
let use an example to explain this feature
frJoin.pig
{code}
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
C = filter B by id > 100;
D = join A by (id,name), C by (id,name) using 'replicated';
store D into './testFRJoin.out';
{code}
Physical Plan
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-17
|
|---D: FRJoin[tuple] - scope-11
    |   |
    |   Project[bytearray][0] - scope-7
    |   |
    |   Project[bytearray][1] - scope-8
    |   |
    |   Project[bytearray][0] - scope-9
    |   |
    |   Project[bytearray][1] - scope-10
    |
    |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-0
    |
    |---C: Filter[bag] - scope-2
        |   |
        |   Greater Than[boolean] - scope-6
        |   |
        |   |---Cast[int] - scope-4
        |   |   |
        |   |   |---Project[bytearray][0] - scope-3
        |   |
        |   |---Constant(100) - scope-5
        |
        |---B: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-1
{code}

Spark plan
{code}
Spark node scope-40
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-862810646/tmp-1458847763:org.apache.pig.impl.io.InterStorage) - scope-41
|
|---C: Filter[bag] - scope-23
    |   |
    |   Greater Than[boolean] - scope-27
    |   |
    |   |---Cast[int] - scope-25
    |   |   |
    |   |   |---Project[bytearray][0] - scope-24
    |   |
    |   |---Constant(100) - scope-26
    |
    |---B: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage) - scope-22--------

Spark node scope-39
D: Store(hdfs://zly1.sh.intel.com:8020/user/root/testFRJoin.out:org.apache.pig.builtin.PigStorage) - scope-38
|
|---D: FRJoin[tuple] - scope-32
    |   |
    |   Project[bytearray][0] - scope-28
    |   |
    |   Project[bytearray][1] - scope-29
    |   |
    |   Project[bytearray][0] - scope-30
    |   |
    |   Project[bytearray][1] - scope-31
    |
    |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage) - scope-21--------
{code}

in SparkCompiler#visitFRJoin:We create a sparkOperator to save the result of replicated file to the hdfs temporary file. We load the temporary file in POFRJoin#setUpHashMap.*Why create a new SparkOperator just load file then store it to a temporary file and then load it in POFRJoin#setUpHash?why not just load the file in POFRJOin#setUpHash?* This is because we can not gurantee that the type of predecessors of FRJoin is POLoad in physical plan, in above case, the predecessors of FRJoin is POFIlter and POLoad.

*How to gurantee that replicated files are access to the spark workers?*
Replicated files are stored in hdfs and spark workers can access them. We set mapred.submit.replication as "10" to make more backups of replicated files so that  spark workers are likely to access the data locally. We don't use Distributed Cache( a map-reduce feature) like what is used in MR mode because we do not gurantee users install MR when they use pig on spark.


> Implement FR Join for spark engine
> ----------------------------------
>
>                 Key: PIG-4771
>                 URL: https://issues.apache.org/jira/browse/PIG-4771
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4771.patch
>
>
> We use regular join to replace FR join in current code base(fd31fda). We need to implement FR join.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)