You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Hong Shen (Jira)" <ji...@apache.org> on 2020/07/14 12:53:00 UTC

[jira] [Comment Edited] (HUDI-1082) Bug in deciding the upsert/insert buckets

    [ https://issues.apache.org/jira/browse/HUDI-1082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157341#comment-17157341 ] 

Hong Shen edited comment on HUDI-1082 at 7/14/20, 12:52 PM:
------------------------------------------------------------

It seems does not matter, we just need to ensure the distribution of records to fileId.

Add a testcase testGetPartitioner2, it has two partitions "2016/09/26" and "2016/09/27".  Generate 200 insert records to partition "2016/09/26" and 2000 insert records to  "2016/09/27", since the fileSize limit is 1000 * 1024, partition "2016/09/26" will generate 2 fileGroup, partition "2016/09/27" will generate 20 fileGroup. For the 200 insert records to partition "2016/09/26", it will has approximately 100 records to fileGroup1, and the others to fileGroup2.

 
{code:java|title=TestUpsertPartitioner.java|borderStyle=solid}
}}

@Test

public void testGetPartitioner2() throws Exception {
 String testPartitionPath1 = "2016/09/26";
 String testPartitionPath2 = "2016/09/27";

 HoodieWriteConfig config = makeHoodieClientConfigBuilder()
 .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
 .insertSplitSize(100).autoTuneInsertSplits(false).build())
 .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();

 HoodieClientTestUtils.fakeCommitFile(basePath, "001");

 metaClient = HoodieTableMetaClient.reload(metaClient);
 HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
 HoodieTestDataGenerator dataGenerator1 = new HoodieTestDataGenerator(new String[] \{testPartitionPath1});
 List<HoodieRecord> insertRecords1 = dataGenerator1.generateInserts("001", 200);
 List<HoodieRecord> records1 = new ArrayList<>();
 records1.addAll(insertRecords1);

 HoodieTestDataGenerator dataGenerator2 = new HoodieTestDataGenerator(new String[] \{testPartitionPath2});
 List<HoodieRecord> insertRecords2 = dataGenerator2.generateInserts("001", 2000);
 records1.addAll(insertRecords2);

 WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records1));
 UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
 Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
 for (HoodieRecord hoodieRecord: insertRecords1) {
 int partition = partitioner.getPartition(new Tuple2<>(
 hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
 if (!partition2numRecords.containsKey(partition)) {
 partition2numRecords.put(partition, 0);
 }
 int num = partition2numRecords.get(partition);
 partition2numRecords.put(partition, num + 1);
 }
 System.out.println(partition2numRecords);
}

{code}
{{Run 5 times, the outputs are:}}
{code:java}
{20=106, 21=94}

{20=97, 21=103}

{20=95, 21=105}

{20=107, 21=93}

{20=113, 21=87}
 {code}


was (Author: shenhong):
It seems does not matter, we just need to ensure the distribution of records to fileId.

Add a testcase testGetPartitioner2, it has two partitions "2016/09/26" and "2016/09/27".  Generate 200 insert records to partition "2016/09/26" and 2000 insert records to  "2016/09/27", since the fileSize limit is 1000 * 1024, partition "2016/09/26" will generate 2 fileGroup, partition "2016/09/27" will generate 20 fileGroup. For the 200 insert records to partition "2016/09/26", it will has approximately 100 records to fileGroup1, and the others to fileGroup2.

 

{{{code:title=TestUpsertPartitioner.java|borderStyle=solid}}}

@Test

public void testGetPartitioner2() throws Exception {
 String testPartitionPath1 = "2016/09/26";
 String testPartitionPath2 = "2016/09/27";

 HoodieWriteConfig config = makeHoodieClientConfigBuilder()
 .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
 .insertSplitSize(100).autoTuneInsertSplits(false).build())
 .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();

 HoodieClientTestUtils.fakeCommitFile(basePath, "001");

 metaClient = HoodieTableMetaClient.reload(metaClient);
 HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
 HoodieTestDataGenerator dataGenerator1 = new HoodieTestDataGenerator(new String[] \{testPartitionPath1});
 List<HoodieRecord> insertRecords1 = dataGenerator1.generateInserts("001", 200);
 List<HoodieRecord> records1 = new ArrayList<>();
 records1.addAll(insertRecords1);

 HoodieTestDataGenerator dataGenerator2 = new HoodieTestDataGenerator(new String[] \{testPartitionPath2});
 List<HoodieRecord> insertRecords2 = dataGenerator2.generateInserts("001", 2000);
 records1.addAll(insertRecords2);

 WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records1));
 UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
 Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
 for (HoodieRecord hoodieRecord: insertRecords1) {
 int partition = partitioner.getPartition(new Tuple2<>(
 hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
 if (!partition2numRecords.containsKey(partition)) {
 partition2numRecords.put(partition, 0);
 }
 int num = partition2numRecords.get(partition);
 partition2numRecords.put(partition, num + 1);
 }
 System.out.println(partition2numRecords);
}

{{{code}}}

{{}}

{{Run 5 times, the outputs are:}}

{{{code}}}

{{}}

 \{20=106, 21=94}

{20=97, 21=103}

{20=95, 21=105}

{20=107, 21=93}

{20=113, 21=87}

{{{code}}}

 

{{}}

{{}}

> Bug in deciding the upsert/insert buckets
> -----------------------------------------
>
>                 Key: HUDI-1082
>                 URL: https://issues.apache.org/jira/browse/HUDI-1082
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Writer Core
>    Affects Versions: 0.6.0
>            Reporter: sivabalan narayanan
>            Assignee: Hong Shen
>            Priority: Major
>             Fix For: 0.6.1
>
>
> In [UpsertPartitioner|[https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java]], when getPartition(Object key) is called, the logic to determine where the record to be inserted is relying on globalInsertCounts where as this should be perPartitionInsertCount.
>  
> Bcoz, the weights for all targetInsert buckets are determined based on total Inserts going into the partition of interest. // check like 200. Whereas when getPartition(key) is called, we use global insert count to determine the right bucket. 
>  
> For instance,
> P1: 3 insert buckets with weights 0.2, 0.5 and 0.3 with total records to be inserted is 100.
> P2: 4 bucket with weights 0.1, 0.8, 0.05, 0.05 with total records to be inserted is 10025. 
> So, ideal allocation is
> P1: B0 -> 20, B1 -> 50, B2 -> 30
> P2: B0 -> 1002, B1 -> 8020, B2 -> 502, B3 -> 503
>  
> getPartition() for a key is determined based on following.
> mod (hash value, inserts)/ inserts.
> Instead of considering inserts for the partition of interest, currently we take global insert counts.
> Lets say, these are the hash values for insert records in P1.
> 5, 14, 20, 25, 90, 500, 1001, 5180.
> record hash | expected bucket in P1 | actual bucket in P1 |
> 5     | B0 | B0
> 14   | B0 | B0
> 21   | B1  | B0
> 30  | B1 | B0
> 90 | B2 | B0
> 500 | B0 | B0
> 1490 | B2 | B1
> 10019 | B0 | B3
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)