You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/19 10:30:10 UTC
[hudi] 02/11: [HUDI-3835] Add UT for delete in java client (#5270)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7bc940c7e4fb26dfa850689161dec754005b406b
Author: 董可伦 <do...@inspur.com>
AuthorDate: Sat Apr 16 03:03:48 2022 +0800
[HUDI-3835] Add UT for delete in java client (#5270)
---
.../commit/TestJavaCopyOnWriteActionExecutor.java | 86 +++++++++++++++++++++-
1 file changed, 85 insertions(+), 1 deletion(-)
diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index 1bf1b4cccb..518414d614 100644
--- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -318,7 +318,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
}
@Test
- public void testInsertRecords() throws Exception {
+ public void testInsertRecords() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfig();
String instantTime = makeNewCommitTime();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -465,6 +465,90 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords));
}
+ @Test
+ public void testDeleteRecords() throws Exception {
+ // Prepare the AvroParquetIO
+ HoodieWriteConfig config = makeHoodieClientConfig();
+ int startInstant = 1;
+ String firstCommitTime = makeNewCommitTime(startInstant++, "%09d");
+ HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
+ writeClient.startCommitWithTime(firstCommitTime);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+
+ String partitionPath = "2022/04/09";
+
+ // Get some records belong to the same partition (2016/01/31)
+ String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":1}";
+ String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2022-04-09T03:20:41.415Z\",\"number\":2}";
+ String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":3}";
+
+ List<HoodieRecord> records = new ArrayList<>();
+ RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
+ records.add(new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
+ RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
+ records.add(new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
+ RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
+ records.add(new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
+
+ // Insert new records
+ writeClient.insert(records, firstCommitTime);
+
+ FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1);
+ assertEquals(1, allFiles.length);
+
+ // Read out the bloom filter and make sure filter can answer record exist or not
+ Path filePath = allFiles[0].getPath();
+ BloomFilter filter = fileUtils.readBloomFilterFromMetadata(hadoopConf, filePath);
+ for (HoodieRecord record : records) {
+ assertTrue(filter.mightContain(record.getRecordKey()));
+ }
+
+ // Read the base file, check the record content
+ List<GenericRecord> fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
+ int index = 0;
+ for (GenericRecord record : fileRecords) {
+ assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
+ index++;
+ }
+
+ String newCommitTime = makeNewCommitTime(startInstant++, "%09d");
+ writeClient.startCommitWithTime(newCommitTime);
+
+ // Test delete two records
+ List<HoodieKey> keysForDelete = new ArrayList(Arrays.asList(records.get(0).getKey(), records.get(2).getKey()));
+ writeClient.delete(keysForDelete, newCommitTime);
+
+ allFiles = getIncrementalFiles(partitionPath, "0", -1);
+ assertEquals(1, allFiles.length);
+
+ filePath = allFiles[0].getPath();
+ // Read the base file, check the record content
+ fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
+ // Check that the two records are deleted successfully
+ assertEquals(1, fileRecords.size());
+ assertEquals(records.get(1).getRecordKey(), fileRecords.get(0).get("_row_key").toString());
+
+ newCommitTime = makeNewCommitTime(startInstant++, "%09d");
+ writeClient.startCommitWithTime(newCommitTime);
+
+ // Test delete last record
+ keysForDelete = new ArrayList(Arrays.asList(records.get(1).getKey()));
+ writeClient.delete(keysForDelete, newCommitTime);
+
+ allFiles = getIncrementalFiles(partitionPath, "0", -1);
+ assertEquals(1, allFiles.length);
+
+ filePath = allFiles[0].getPath();
+ // Read the base file, check the record content
+ fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
+ // Check whether all records have been deleted
+ assertEquals(0, fileRecords.size());
+ }
+
public static Map<String, Long> generateExpectedPartitionNumRecords(List<HoodieRecord> records) {
return records.stream().map(record -> Pair.of(record.getPartitionPath(), 1))
.collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));