You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Mohit Sabharwal (JIRA)" <ji...@apache.org> on 2015/05/21 05:33:00 UTC
[jira] [Commented] (PIG-4565) Support custom MR partitioners for
Spark engine
[ https://issues.apache.org/jira/browse/PIG-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553571#comment-14553571 ]
Mohit Sabharwal commented on PIG-4565:
--------------------------------------
FYI: [~kellyzly], [~praveenr019], [~xuefuz]
This patch:
- Adds the wrapper MapReducePartitionerWrapper
- Changes GlobalRearrangeConverter and DistinctConverter to use this wrapper.
- Changes GlobalRearrangeConverter such that it uses CoGroupedRDD for BOTH GROUP (1 input) and COGROUP (>1 inputs). (There is no good reason why we should use different Spark APIs for 1 vs multiple inputs.)
- Evaluating the optimal API for shuffling (CoGroupedRDD vs reduceByKey vs groupBy) is a TODO item in Milestone 1.
- Secondary sort case currently does not support custom partitioner. This will be fixed as part of shuffle API fix above.
> Support custom MR partitioners for Spark engine
> ------------------------------------------------
>
> Key: PIG-4565
> URL: https://issues.apache.org/jira/browse/PIG-4565
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Affects Versions: spark-branch
> Reporter: Mohit Sabharwal
> Assignee: Mohit Sabharwal
> Fix For: spark-branch
>
> Attachments: PIG-4565.patch
>
>
> Shuffle operations like DISTINCT, GROUP, JOIN, CROSS allow custom MR partitioners to be specified.
> Example:
> {code}
> B = GROUP A BY $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner PARALLEL 2;
> public class SimpleCustomPartitioner extends Partitioner <PigNullableWritable, Writable> {
> //@Override
> public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
> if(key.getValueAsPigType() instanceof Integer) {
> int ret = (((Integer)key.getValueAsPigType()).intValue() % numPartitions);
> return ret;
> }
> else {
> return (key.hashCode()) % numPartitions;
> }
> }
> }
> {code}
> Since Spark's shuffle APIs takes a different parititioner class (org.apache.spark.Partitioner) compared to MapReduce (org.apache.hadoop.mapreduce.Partitioner), we need to wrap custom partitioners written for MapReduce inside a Spark Partitioner.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)