You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Mohit Sabharwal (JIRA)" <ji...@apache.org> on 2015/05/04 06:12:06 UTC

[jira] [Commented] (PIG-4190) Implement replicated join in Spark engine

    [ https://issues.apache.org/jira/browse/PIG-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14526220#comment-14526220 ] 

Mohit Sabharwal commented on PIG-4190:
--------------------------------------

FYI: [~kellyzly], [~praveenr019], [~xuefuz]

Attached patch implements FRJoin using regular join
functionality available in Spark core.

The implementation is similar to the one for
Skew Join, except that:
 - FRJoin can have more than 2 inputs.
 - FRJoin supports left outer join.

Spark's join operator does not allow more
that two inputs. So the patch has to do an 
extra mapPartitions() transformation before we
do the next join():
  join().mapPartitions().join()....
  
Spark does support the leftOuterJoin() operator,
so we use that directly.

The join().mapPartitions().join().... approach
leads to lot of object creation so is perhaps
not the most efficient. The other way is to
create a CoGroupRDD and then compute a cross
product of the all the "bags". (Note that
Spark implements join() as cogroup + flatMapValues
See: https://github.com/apache/spark/blob/2c32bef1790dac6f77ef9674f6106c2e24ea0338/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L504 )
We can implement this optimization in a future patch.

> Implement replicated join in Spark engine
> -----------------------------------------
>
>                 Key: PIG-4190
>                 URL: https://issues.apache.org/jira/browse/PIG-4190
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Praveen Rachabattuni
>            Assignee: Mohit Sabharwal
>             Fix For: spark-branch
>
>         Attachments: PIG-4190.patch
>
>
> Related e2e tests: Union_7, Union_8, Union_13
> Sample script:
> a = load '/user/pig/tests/data/singlefile/studenttab10k' as (name, age, gpa);
> b = load '/user/pig/tests/data/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
> c = union a, b;
> d = load '/user/pig/tests/data/singlefile/votertab10k' as (name, age, registration, contributions);
> e = join c by name, d by name using 'replicated';
> store e into '/user/pig/out/praveenr-1411380943-nightly.conf/Union_7.out';
> Pig Stack Trace
> ---------------
> ERROR 0: java.lang.IllegalArgumentException: Spork unsupported PhysicalOperator: (Name: e: FRJoin[tuple] - scope-6 Operator Key: scope-6)
> org.apache.pig.backend.executionengine.ExecException: ERROR 0: java.lang.IllegalArgumentException: Spork unsupported PhysicalOperator: (Name: e: FRJoin[tuple] - scope-6 Operator Key: scope-6)
> 	at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:285)
> 	at org.apache.pig.PigServer.launchPlan(PigServer.java:1378)
> 	at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1363)
> 	at org.apache.pig.PigServer.execute(PigServer.java:1352)
> 	at org.apache.pig.PigServer.executeBatch(PigServer.java:403)
> 	at org.apache.pig.PigServer.executeBatch(PigServer.java:386)
> 	at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:170)
> 	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:233)
> 	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:204)
> 	at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
> 	at org.apache.pig.Main.run(Main.java:611)
> 	at org.apache.pig.Main.main(Main.java:164)
> Caused by: java.lang.IllegalArgumentException: Spork unsupported PhysicalOperator: (Name: e: FRJoin[tuple] - scope-6 Operator Key: scope-6)
> 	at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:239)
> 	at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:232)
> 	at org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.launchPig(SparkLauncher.java:140)
> 	at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:279)
> 	... 11 more
> ================================================================================



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)