You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Rohini Palaniswamy (JIRA)" <ji...@apache.org> on 2016/10/14 03:34:20 UTC

[jira] [Comment Edited] (PIG-5041) RoundRobinPartitioner is not deterministic when order of input records change

    [ https://issues.apache.org/jira/browse/PIG-5041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15574013#comment-15574013 ] 

Rohini Palaniswamy edited comment on PIG-5041 at 10/14/16 3:33 AM:
-------------------------------------------------------------------

bq. Bag can also be a key. 
  In distinct? That definitely is a possibility and I think we will have to try fix for that case. I don't see folks using it as a key in group by, order by or join. Will open a separate jira for that as that will involve changing hashcode implementation for bags. That is not a concern for this issue or partitioner implementation as we don't care where each records go as long as they go to same partitioner every time. Skipping bags and maps in hashcode will also provide speedup. In case of distinct, the identical bags have to go to same reducer and so hashcode of the bags are important in that case.

bq. I think that only happens when client/server running different jvm
   Mapreduce/Tez shuffle has no guarantee on the order of the values in a Key,List<Values> input to the reducer. It all depends on which map's shuffle output is fetched first during merge process. So the values in the bag can be of any order and it does not depend on the jvm.


was (Author: rohini):
bq. Bag can also be a key. 
  In distinct? That definitely is a possibility and I think we will have to try fix for that case. I don't see folks using it as a key in group by, order by or join. 

bq. I think that only happens when client/server running different jvm
   Mapreduce/Tez shuffle has no guarantee on the order of the values in a Key,List<Values> input to the reducer. It all depends on which map's shuffle output is fetched first during merge process. So the values in the bag can be of any order and it does not depend on the jvm.

> RoundRobinPartitioner is not deterministic when order of input records change
> -----------------------------------------------------------------------------
>
>                 Key: PIG-5041
>                 URL: https://issues.apache.org/jira/browse/PIG-5041
>             Project: Pig
>          Issue Type: Bug
>            Reporter: Rohini Palaniswamy
>            Assignee: Rohini Palaniswamy
>            Priority: Critical
>             Fix For: 0.16.1
>
>         Attachments: PIG-5041-1.patch, PIG-5041-2.patch
>
>
> Maps can be rerun due to shuffle fetch failures. Half of the reducers can end up successfully pulling partitions from first run of the map while other half could pull from the rerun after shuffle fetch failures. If the data is not partitioned by the Partitioner exactly the same way every time then it could lead to incorrect results (loss of records and duplicated records). 
> There is a good probability of order of input records changing
>     - With OrderedGroupedMergedKVInput (shuffle input), they keys are sorted but values can be in any order as the shuffle and merge depends on the order in which inputs are fetched. Anything involving FLATTEN can produce different order of output records.
>     - With UnorderedKVInput, the records could be in any order depending on order of shuffle fetch. 
> RoundRobinPartitioner can partition records differently everytime as order of input records change which is very bad. We need to get rid of RoundRobinPartitioner. Since the key is empty whenever we use  RoundRobinPartitioner we need to partitioning based on hashcode of values to produce consistent partitioning.
> Partitioning based on hashcode is required for correctness, but disadvantage is that it
>     - adds a lot of performance overhead with hashcode computation
>     - with the random distribution due to hashcode (as opposed to batched round robin) input records sorted on some column could get distributed to different reducers and if union is followed by a store, the output can have bad compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)