You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2016/07/18 03:28:20 UTC

[jira] [Commented] (PIG-4553) Implement secondary sort using one shuffle

    [ https://issues.apache.org/jira/browse/PIG-4553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381663#comment-15381663 ] 

liyunzhang_intel commented on PIG-4553:
---------------------------------------

[~xuefuz]: please commit it to branch, thanks.

> Implement secondary sort using one shuffle
> ------------------------------------------
>
>                 Key: PIG-4553
>                 URL: https://issues.apache.org/jira/browse/PIG-4553
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4553_1.patch, PIG-4553_2.patch
>
>
> Now we implement secondary key sort in 
> GlobalRearrangeConverter#convert
> first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
> {code}
> public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
>                               POGlobalRearrangeSpark physicalOperator) throws IOException {
> ....
>   if (predecessors.size() == 1) {
>             // GROUP
>             JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
>             if (physicalOperator.isUseSecondaryKey()) {
>                 RDD<Tuple> rdd = predecessors.get(0);
>                 RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
>                         SparkUtil.<Tuple, Object>getTuple2Manifest());
>                 JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
>                         SparkUtil.getManifest(Tuple.class),
>                         SparkUtil.getManifest(Object.class));
>                 //first sort the tuple by secondary key if enable useSecondaryKey sort
>                 JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));  // first shuffle 
>                 JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
>                 prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);// second shuffle
>             } else {
>                 JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
>                 prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
>             }
>             JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
>             return jrdd2.rdd();
>         } 
> ....
> }
> {code}
> we can optimize it according to the code from https://github.com/tresata/spark-sorted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)