You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by Praveen R <pr...@sigmoidanalytics.com> on 2015/03/13 08:51:44 UTC

Review Request 32031: PIG-4193: Make collected group work with Spark

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32031/
-----------------------------------------------------------

Review request for pig, liyun zhang and Mohit Sabharwal.


Bugs: PIG-4193
    https://issues.apache.org/jira/browse/PIG-4193


Repository: pig-git


Description
-------

Moved getNextTuple(boolean proceed) method from POCollectedGroup to POCollectedGroupSpark.

Collected group when used with mr performs group operation in the mapside after making sure all data for same key exists on single map. This behaviour in spark is achieved by a single map on function using POCollectedGroup operator.

TODO:
- Avoid using rdd.count() in CollectedGroupConverter.


Diffs
-----

  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f2f18e52e083b3e8e90ba02d07f12bcbc9be859 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java ca7a45f33320064e22628b40b34be7b9f7b07c36 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 3d04ba11855c39960e00d6f51b66654d1c70ebad 
  src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION 

Diff: https://reviews.apache.org/r/32031/diff/


Testing
-------

Tested TestCollectedGroup and do not have any new successes or failures.


Thanks,

Praveen R


Re: Review Request 32031: PIG-4193: Make collected group work with Spark

Posted by Mohit Sabharwal <mo...@cloudera.com>.

> On March 27, 2015, 6:40 a.m., Mohit Sabharwal wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java, line 43
> > <https://reviews.apache.org/r/32031/diff/2/?file=893978#file893978line43>
> >
> >     Do we necessarily need to coalesce to a single partition in order to count number of elements in the RDD ? 
> >     
> >     Also, we later do processing one partition at a time, using mapPartitions:
> >     rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true).rdd();
> >     
> >     Isn't the count computed inaccurate ? (it represents all elements in rdd, not just all elements in a partition - and mapParitions process one partition at a time)
> >     
> >     Also, are we processing one partition at a time (mapParitions vs map) here mostly for efficiency ?

Ah, ignore my comment about count being inaccurate since it's accumulated across invocations of collectedGroupFunction.


- Mohit


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32031/#review78021
-----------------------------------------------------------


On March 13, 2015, 10:42 a.m., Praveen Rachabattuni wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32031/
> -----------------------------------------------------------
> 
> (Updated March 13, 2015, 10:42 a.m.)
> 
> 
> Review request for pig, liyun zhang and Mohit Sabharwal.
> 
> 
> Bugs: PIG-4193
>     https://issues.apache.org/jira/browse/PIG-4193
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Moved getNextTuple(boolean proceed) method from POCollectedGroup to POCollectedGroupSpark.
> 
> Collected group when used with mr performs group operation in the mapside after making sure all data for same key exists on single map. This behaviour in spark is achieved by a single map on function using POCollectedGroup operator.
> 
> TODO:
> - Avoid using rdd.count() in CollectedGroupConverter.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f2f18e52e083b3e8e90ba02d07f12bcbc9be859 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java ca7a45f33320064e22628b40b34be7b9f7b07c36 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 3d04ba11855c39960e00d6f51b66654d1c70ebad 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/32031/diff/
> 
> 
> Testing
> -------
> 
> Tested TestCollectedGroup and do not have any new successes or failures.
> 
> 
> Thanks,
> 
> Praveen Rachabattuni
> 
>


Re: Review Request 32031: PIG-4193: Make collected group work with Spark

Posted by Mohit Sabharwal <mo...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32031/#review78021
-----------------------------------------------------------


Thanks for patch, Praveen!

I had a few questions. 

Can't we just set "endOfAllInput" to true when we encounter the last tuple in CollectedGroupFunction (instead of the passing a special arg to poCollectedGroup.getNextTuple()) ?

i.e.  poCollectedGroup.getPlans().get(0).endOfAllInput = true


src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
<https://reviews.apache.org/r/32031/#comment126402>

    From Rohini's comment on PIG-4193, doesn't change to POCollectedGroup need to go into a separate patch?



src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
<https://reviews.apache.org/r/32031/#comment126400>

    Do we necessarily need to coalesce to a single partition in order to count number of elements in the RDD ? 
    
    Also, we later do processing one partition at a time, using mapPartitions:
    rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true).rdd();
    
    Isn't the count computed inaccurate ? (it represents all elements in rdd, not just all elements in a partition - and mapParitions process one partition at a time)
    
    Also, are we processing one partition at a time (mapParitions vs map) here mostly for efficiency ?



src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
<https://reviews.apache.org/r/32031/#comment126404>

    Call this "done" instead of "proceed" ? -- latter seems unintuitive term for end of computation.



src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
<https://reviews.apache.org/r/32031/#comment126403>

    let's call this "count" or just "i" ? "current_val" seems misleading name for a counter.



src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
<https://reviews.apache.org/r/32031/#comment126401>

    Please use
    LOG.error("Message " + e, e);
    instead.



src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java
<https://reviews.apache.org/r/32031/#comment126406>

    Please fix this comment. There is no "mapper" in our case.



src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java
<https://reviews.apache.org/r/32031/#comment126405>

    let's make POCollectedGroup.getStreamCloseResult() public and call here instead of duplicating.


- Mohit Sabharwal


On March 13, 2015, 10:42 a.m., Praveen Rachabattuni wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32031/
> -----------------------------------------------------------
> 
> (Updated March 13, 2015, 10:42 a.m.)
> 
> 
> Review request for pig, liyun zhang and Mohit Sabharwal.
> 
> 
> Bugs: PIG-4193
>     https://issues.apache.org/jira/browse/PIG-4193
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Moved getNextTuple(boolean proceed) method from POCollectedGroup to POCollectedGroupSpark.
> 
> Collected group when used with mr performs group operation in the mapside after making sure all data for same key exists on single map. This behaviour in spark is achieved by a single map on function using POCollectedGroup operator.
> 
> TODO:
> - Avoid using rdd.count() in CollectedGroupConverter.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f2f18e52e083b3e8e90ba02d07f12bcbc9be859 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java ca7a45f33320064e22628b40b34be7b9f7b07c36 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 3d04ba11855c39960e00d6f51b66654d1c70ebad 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/32031/diff/
> 
> 
> Testing
> -------
> 
> Tested TestCollectedGroup and do not have any new successes or failures.
> 
> 
> Thanks,
> 
> Praveen Rachabattuni
> 
>


Re: Review Request 32031: PIG-4193: Make collected group work with Spark

Posted by Praveen R <pr...@sigmoidanalytics.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32031/
-----------------------------------------------------------

(Updated March 13, 2015, 10:42 a.m.)


Review request for pig, liyun zhang and Mohit Sabharwal.


Changes
-------

Added license header


Bugs: PIG-4193
    https://issues.apache.org/jira/browse/PIG-4193


Repository: pig-git


Description
-------

Moved getNextTuple(boolean proceed) method from POCollectedGroup to POCollectedGroupSpark.

Collected group when used with mr performs group operation in the mapside after making sure all data for same key exists on single map. This behaviour in spark is achieved by a single map on function using POCollectedGroup operator.

TODO:
- Avoid using rdd.count() in CollectedGroupConverter.


Diffs (updated)
-----

  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f2f18e52e083b3e8e90ba02d07f12bcbc9be859 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java ca7a45f33320064e22628b40b34be7b9f7b07c36 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 3d04ba11855c39960e00d6f51b66654d1c70ebad 
  src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION 

Diff: https://reviews.apache.org/r/32031/diff/


Testing
-------

Tested TestCollectedGroup and do not have any new successes or failures.


Thanks,

Praveen R


Re: Review Request 32031: PIG-4193: Make collected group work with Spark

Posted by kelly zhang <li...@intel.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32031/#review76344
-----------------------------------------------------------



src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java
<https://reviews.apache.org/r/32031/#comment123908>

    add license text in the head


- kelly zhang


On March 13, 2015, 7:51 a.m., Praveen R wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32031/
> -----------------------------------------------------------
> 
> (Updated March 13, 2015, 7:51 a.m.)
> 
> 
> Review request for pig, liyun zhang and Mohit Sabharwal.
> 
> 
> Bugs: PIG-4193
>     https://issues.apache.org/jira/browse/PIG-4193
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Moved getNextTuple(boolean proceed) method from POCollectedGroup to POCollectedGroupSpark.
> 
> Collected group when used with mr performs group operation in the mapside after making sure all data for same key exists on single map. This behaviour in spark is achieved by a single map on function using POCollectedGroup operator.
> 
> TODO:
> - Avoid using rdd.count() in CollectedGroupConverter.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f2f18e52e083b3e8e90ba02d07f12bcbc9be859 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java ca7a45f33320064e22628b40b34be7b9f7b07c36 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 3d04ba11855c39960e00d6f51b66654d1c70ebad 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/32031/diff/
> 
> 
> Testing
> -------
> 
> Tested TestCollectedGroup and do not have any new successes or failures.
> 
> 
> Thanks,
> 
> Praveen R
> 
>