You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Mohit Sabharwal (JIRA)" <ji...@apache.org> on 2016/07/16 00:54:20 UTC
[jira] [Commented] (PIG-4553) Implement secondary sort using 1
shuffle not twice
[ https://issues.apache.org/jira/browse/PIG-4553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15380386#comment-15380386 ]
Mohit Sabharwal commented on PIG-4553:
--------------------------------------
LGTM, +1 (non-binding)
> Implement secondary sort using 1 shuffle not twice
> --------------------------------------------------
>
> Key: PIG-4553
> URL: https://issues.apache.org/jira/browse/PIG-4553
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4553_1.patch, PIG-4553_2.patch
>
>
> Now we implement secondary key sort in
> GlobalRearrangeConverter#convert
> first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
> {code}
> public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
> POGlobalRearrangeSpark physicalOperator) throws IOException {
> ....
> if (predecessors.size() == 1) {
> // GROUP
> JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
> if (physicalOperator.isUseSecondaryKey()) {
> RDD<Tuple> rdd = predecessors.get(0);
> RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
> SparkUtil.<Tuple, Object>getTuple2Manifest());
> JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
> SparkUtil.getManifest(Tuple.class),
> SparkUtil.getManifest(Object.class));
> //first sort the tuple by secondary key if enable useSecondaryKey sort
> JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder())); // first shuffle
> JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
> prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);// second shuffle
> } else {
> JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
> prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
> }
> JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
> return jrdd2.rdd();
> }
> ....
> }
> {code}
> we can optimize it according to the code from https://github.com/tresata/spark-sorted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)