You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2016/09/13 08:42:21 UTC

[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

    [ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15486676#comment-15486676 ] 

liyunzhang_intel commented on PIG-5029:
---------------------------------------

The solution to solve the skewed data sort in PIG-5029.patch is:
0. set an option "pig.spark.skewed.sort" in pig.properties and can be enabled if user consider the sorted data is distributed skewed.
1. append a random integer to the sorted tuple like (1,(xxx)) to (1,(xxx), random integer)(SortConverter.ToKeyValueFunction).
2. add a new class SkewedKeyComparator in SortConverter which will influence the partition in the shuffle process of RDD.sortByKey. Before the partition will not be distributed evenly because of the skewed key. But now combined key (key,random integer) is used in the partition so the partition will not be skewed anymore.
3.The last random integer will be removed from the tuple in SortConverter.ToValueFunction.

> Optimize sort case when data is skewed
> --------------------------------------
>
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] to implement the sort feature. Although [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] is used by RDD.sortByKey and RangePartitiner will sample data and ranges the key roughly into equal range, the test result(attached  document) shows that one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)