You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2020/07/08 14:48:00 UTC

[jira] [Updated] (HUDI-1082) Bug in deciding the upsert/insert buckets

     [ https://issues.apache.org/jira/browse/HUDI-1082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan updated HUDI-1082:
--------------------------------------
    Description: 
In [UpsertPartitioner|[https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java]], when getPartition(Object key) is called, the logic to determine where the record to be inserted is relying on globalInsertCounts where as this should be perPartitionInsertCount.

 

Bcoz, the weights for all targetInsert buckets are determined based on total Inserts going into the partition of interest. // check like 200. Whereas when getPartition(key) is called, we use global insert count to determine the right bucket. 

 

For instance,

P1: 3 insert buckets with weights 0.2, 0.5 and 0.3 with total records to be inserted is 100.

P2: 4 bucket with weights 0.1, 0.8, 0.05, 0.05 with total records to be inserted is 10025. 

So, ideal allocation is

P1: B0 -> 20, B1 -> 50, B2 -> 30

P2: B0 -> 1002, B1 -> 8020, B2 -> 502, B3 -> 503

 

getPartition() for a key is determined based on following.

mod (hash value, inserts)/ inserts.

Instead of considering inserts for the partition of interest, currently we take global insert counts.

Lets say, these are the hash values for insert records in P1.

5, 14, 20, 25, 90, 500, 1001, 5180.

record hash | expected bucket in P1 | actual bucket in P1 |

5     | B0 | B0

14   | B0 | B0

21   | B1  | B0

30  | B1 | B0

90 | B2 | B0

500 | B0 | B0

1490 | B2 | B1

10019 | B0 | B3

 

 

 

 

 

 

 

 

 

 

  was:
In [UpsertPartitioner|[https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java]], when getPartition(Object key) is called, the logic to determine where the record to be inserted is relying on globalInsertCounts where as this should be perPartitionInsertCount.

 

Bcoz, the weights for all targetInsert buckets are determined based on total Inserts going into the partition of interest. // check like 200. Whereas when getPartition(key) is called, we use global insert count to determine the right bucket. 

 

For instance,

P1: 3 insert buckets with weights 0.2, 0.5 and 0.3 with total records to be inserted is 100.

P2: 4 bucket with weights 0.1, 0.8, 0.05, 0.05 with total records to be inserted is 10025. 

So, ideal allocation is

P1: B0 -> 20, B1 -> 50, B2 -> 30

P2: B0 -> 1002, B1 -> 8020, B2 -> 502, B3 -> 503

 

getPartition() for a key is determined based on following.

mod (hash value/inserts)/ inserts.

Instead of considering inserts for the partition of interest, currently we take global insert counts.

Lets say, these are the hash values for insert records in P1.

5, 14, 20, 25, 90, 500, 1001, 5180.

record hash | expected bucket in P1 | actual bucket in P1 |

5     | B0 | B0

14   | B0 | B0

21   | B1  | B0

30  | B1 | B0

90 | B2 | B0

500 | B0 | B0

1490 | B2 | B1

10019 | B0 | B3

 

 

 

 

 

 

 

 

 

 


> Bug in deciding the upsert/insert buckets
> -----------------------------------------
>
>                 Key: HUDI-1082
>                 URL: https://issues.apache.org/jira/browse/HUDI-1082
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Writer Core
>    Affects Versions: 0.6.0
>            Reporter: sivabalan narayanan
>            Priority: Major
>
> In [UpsertPartitioner|[https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java]], when getPartition(Object key) is called, the logic to determine where the record to be inserted is relying on globalInsertCounts where as this should be perPartitionInsertCount.
>  
> Bcoz, the weights for all targetInsert buckets are determined based on total Inserts going into the partition of interest. // check like 200. Whereas when getPartition(key) is called, we use global insert count to determine the right bucket. 
>  
> For instance,
> P1: 3 insert buckets with weights 0.2, 0.5 and 0.3 with total records to be inserted is 100.
> P2: 4 bucket with weights 0.1, 0.8, 0.05, 0.05 with total records to be inserted is 10025. 
> So, ideal allocation is
> P1: B0 -> 20, B1 -> 50, B2 -> 30
> P2: B0 -> 1002, B1 -> 8020, B2 -> 502, B3 -> 503
>  
> getPartition() for a key is determined based on following.
> mod (hash value, inserts)/ inserts.
> Instead of considering inserts for the partition of interest, currently we take global insert counts.
> Lets say, these are the hash values for insert records in P1.
> 5, 14, 20, 25, 90, 500, 1001, 5180.
> record hash | expected bucket in P1 | actual bucket in P1 |
> 5     | B0 | B0
> 14   | B0 | B0
> 21   | B1  | B0
> 30  | B1 | B0
> 90 | B2 | B0
> 500 | B0 | B0
> 1490 | B2 | B1
> 10019 | B0 | B3
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)