You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Hong Shen (Jira)" <ji...@apache.org> on 2020/07/14 12:53:00 UTC
[jira] [Comment Edited] (HUDI-1082) Bug in deciding the
upsert/insert buckets
[ https://issues.apache.org/jira/browse/HUDI-1082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157341#comment-17157341 ]
Hong Shen edited comment on HUDI-1082 at 7/14/20, 12:52 PM:
------------------------------------------------------------
It seems does not matter, we just need to ensure the distribution of records to fileId.
Add a testcase testGetPartitioner2, it has two partitions "2016/09/26" and "2016/09/27". Generate 200 insert records to partition "2016/09/26" and 2000 insert records to "2016/09/27", since the fileSize limit is 1000 * 1024, partition "2016/09/26" will generate 2 fileGroup, partition "2016/09/27" will generate 20 fileGroup. For the 200 insert records to partition "2016/09/26", it will has approximately 100 records to fileGroup1, and the others to fileGroup2.
{code:java|title=TestUpsertPartitioner.java|borderStyle=solid}
}}
@Test
public void testGetPartitioner2() throws Exception {
String testPartitionPath1 = "2016/09/26";
String testPartitionPath2 = "2016/09/27";
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
.insertSplitSize(100).autoTuneInsertSplits(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
HoodieTestDataGenerator dataGenerator1 = new HoodieTestDataGenerator(new String[] \{testPartitionPath1});
List<HoodieRecord> insertRecords1 = dataGenerator1.generateInserts("001", 200);
List<HoodieRecord> records1 = new ArrayList<>();
records1.addAll(insertRecords1);
HoodieTestDataGenerator dataGenerator2 = new HoodieTestDataGenerator(new String[] \{testPartitionPath2});
List<HoodieRecord> insertRecords2 = dataGenerator2.generateInserts("001", 2000);
records1.addAll(insertRecords2);
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records1));
UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
for (HoodieRecord hoodieRecord: insertRecords1) {
int partition = partitioner.getPartition(new Tuple2<>(
hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
if (!partition2numRecords.containsKey(partition)) {
partition2numRecords.put(partition, 0);
}
int num = partition2numRecords.get(partition);
partition2numRecords.put(partition, num + 1);
}
System.out.println(partition2numRecords);
}
{code}
{{Run 5 times, the outputs are:}}
{code:java}
{20=106, 21=94}
{20=97, 21=103}
{20=95, 21=105}
{20=107, 21=93}
{20=113, 21=87}
{code}
was (Author: shenhong):
It seems does not matter, we just need to ensure the distribution of records to fileId.
Add a testcase testGetPartitioner2, it has two partitions "2016/09/26" and "2016/09/27". Generate 200 insert records to partition "2016/09/26" and 2000 insert records to "2016/09/27", since the fileSize limit is 1000 * 1024, partition "2016/09/26" will generate 2 fileGroup, partition "2016/09/27" will generate 20 fileGroup. For the 200 insert records to partition "2016/09/26", it will has approximately 100 records to fileGroup1, and the others to fileGroup2.
{{{code:title=TestUpsertPartitioner.java|borderStyle=solid}}}
@Test
public void testGetPartitioner2() throws Exception {
String testPartitionPath1 = "2016/09/26";
String testPartitionPath2 = "2016/09/27";
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
.insertSplitSize(100).autoTuneInsertSplits(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
HoodieTestDataGenerator dataGenerator1 = new HoodieTestDataGenerator(new String[] \{testPartitionPath1});
List<HoodieRecord> insertRecords1 = dataGenerator1.generateInserts("001", 200);
List<HoodieRecord> records1 = new ArrayList<>();
records1.addAll(insertRecords1);
HoodieTestDataGenerator dataGenerator2 = new HoodieTestDataGenerator(new String[] \{testPartitionPath2});
List<HoodieRecord> insertRecords2 = dataGenerator2.generateInserts("001", 2000);
records1.addAll(insertRecords2);
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records1));
UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
for (HoodieRecord hoodieRecord: insertRecords1) {
int partition = partitioner.getPartition(new Tuple2<>(
hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
if (!partition2numRecords.containsKey(partition)) {
partition2numRecords.put(partition, 0);
}
int num = partition2numRecords.get(partition);
partition2numRecords.put(partition, num + 1);
}
System.out.println(partition2numRecords);
}
{{{code}}}
{{}}
{{Run 5 times, the outputs are:}}
{{{code}}}
{{}}
\{20=106, 21=94}
{20=97, 21=103}
{20=95, 21=105}
{20=107, 21=93}
{20=113, 21=87}
{{{code}}}
{{}}
{{}}
> Bug in deciding the upsert/insert buckets
> -----------------------------------------
>
> Key: HUDI-1082
> URL: https://issues.apache.org/jira/browse/HUDI-1082
> Project: Apache Hudi
> Issue Type: Bug
> Components: Writer Core
> Affects Versions: 0.6.0
> Reporter: sivabalan narayanan
> Assignee: Hong Shen
> Priority: Major
> Fix For: 0.6.1
>
>
> In [UpsertPartitioner|[https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java]], when getPartition(Object key) is called, the logic to determine where the record to be inserted is relying on globalInsertCounts where as this should be perPartitionInsertCount.
>
> Bcoz, the weights for all targetInsert buckets are determined based on total Inserts going into the partition of interest. // check like 200. Whereas when getPartition(key) is called, we use global insert count to determine the right bucket.
>
> For instance,
> P1: 3 insert buckets with weights 0.2, 0.5 and 0.3 with total records to be inserted is 100.
> P2: 4 bucket with weights 0.1, 0.8, 0.05, 0.05 with total records to be inserted is 10025.
> So, ideal allocation is
> P1: B0 -> 20, B1 -> 50, B2 -> 30
> P2: B0 -> 1002, B1 -> 8020, B2 -> 502, B3 -> 503
>
> getPartition() for a key is determined based on following.
> mod (hash value, inserts)/ inserts.
> Instead of considering inserts for the partition of interest, currently we take global insert counts.
> Lets say, these are the hash values for insert records in P1.
> 5, 14, 20, 25, 90, 500, 1001, 5180.
> record hash | expected bucket in P1 | actual bucket in P1 |
> 5 | B0 | B0
> 14 | B0 | B0
> 21 | B1 | B0
> 30 | B1 | B0
> 90 | B2 | B0
> 500 | B0 | B0
> 1490 | B2 | B1
> 10019 | B0 | B3
>
>
>
>
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)