You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2016/11/01 06:47:58 UTC

[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed

    [ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624556#comment-15624556 ] 

liyunzhang_intel commented on PIG-5029:
---------------------------------------

[~knoguchi]:  

Before we discussed a lot about the skewed key sort issue in spark mode.  Before i proposed the PIG-5029_1.patch and append a random integer to the original key thus to make the skewed key distributed evenly([salted key solution |http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska]. You mentioned some issues this patch brings such as duplicated and missing results. Later [~tgraves] commented that
{quote}
I'm assuming in the presentation they are using a salt key such that it generates the same random if the map task is reran like Koji mentioned. For instance can you use the task id (not attempt) such that the key is always the same if it re-runs.
{quote}
in PIG-5051_5029_5.patch
So now i generate the random by {noformat} new Random(seed) {noformat}. Here the value of {noformat}seed{noformat} is PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX);  thus the generated random integers in different task attempts of a spark task are same if retries happens because the value of  PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX) are same.

Do you think this solution is ok? If you think it is ok.  [~kexianda] will start to review this patch.

Thanks [~knoguchi]'s patience on this jira again and this is a really interesting issue.



> Optimize sort case when data is skewed
> --------------------------------------
>
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] to implement the sort feature. Although [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] is used by RDD.sortByKey and RangePartitiner will sample data and ranges the key roughly into equal range, the test result(attached  document) shows that one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)