You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/19 10:30:10 UTC

[hudi] 02/11: [HUDI-3835] Add UT for delete in java client (#5270)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7bc940c7e4fb26dfa850689161dec754005b406b
Author: 董可伦 <do...@inspur.com>
AuthorDate: Sat Apr 16 03:03:48 2022 +0800

    [HUDI-3835] Add UT for delete in java client (#5270)
---
 .../commit/TestJavaCopyOnWriteActionExecutor.java  | 86 +++++++++++++++++++++-
 1 file changed, 85 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index 1bf1b4cccb..518414d614 100644
--- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -318,7 +318,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
   }
 
   @Test
-    public void testInsertRecords() throws Exception {
+  public void testInsertRecords() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfig();
     String instantTime = makeNewCommitTime();
     metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -465,6 +465,90 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase
     verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords));
   }
 
+  @Test
+  public void testDeleteRecords() throws Exception {
+    // Prepare the AvroParquetIO
+    HoodieWriteConfig config = makeHoodieClientConfig();
+    int startInstant = 1;
+    String firstCommitTime = makeNewCommitTime(startInstant++, "%09d");
+    HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(firstCommitTime);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+
+    String partitionPath = "2022/04/09";
+
+    // Get some records belong to the same partition (2016/01/31)
+    String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+            + "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":1}";
+    String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+            + "\"time\":\"2022-04-09T03:20:41.415Z\",\"number\":2}";
+    String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+            + "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":3}";
+
+    List<HoodieRecord> records = new ArrayList<>();
+    RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
+    records.add(new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
+    RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
+    records.add(new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
+    RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
+    records.add(new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
+
+    // Insert new records
+    writeClient.insert(records, firstCommitTime);
+
+    FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1);
+    assertEquals(1, allFiles.length);
+
+    // Read out the bloom filter and make sure filter can answer record exist or not
+    Path filePath = allFiles[0].getPath();
+    BloomFilter filter = fileUtils.readBloomFilterFromMetadata(hadoopConf, filePath);
+    for (HoodieRecord record : records) {
+      assertTrue(filter.mightContain(record.getRecordKey()));
+    }
+
+    // Read the base file, check the record content
+    List<GenericRecord> fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
+    int index = 0;
+    for (GenericRecord record : fileRecords) {
+      assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
+      index++;
+    }
+
+    String newCommitTime = makeNewCommitTime(startInstant++, "%09d");
+    writeClient.startCommitWithTime(newCommitTime);
+
+    // Test delete two records
+    List<HoodieKey> keysForDelete = new ArrayList(Arrays.asList(records.get(0).getKey(), records.get(2).getKey()));
+    writeClient.delete(keysForDelete, newCommitTime);
+
+    allFiles = getIncrementalFiles(partitionPath, "0", -1);
+    assertEquals(1, allFiles.length);
+
+    filePath = allFiles[0].getPath();
+    // Read the base file, check the record content
+    fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
+    // Check that the two records are deleted successfully
+    assertEquals(1, fileRecords.size());
+    assertEquals(records.get(1).getRecordKey(), fileRecords.get(0).get("_row_key").toString());
+
+    newCommitTime = makeNewCommitTime(startInstant++, "%09d");
+    writeClient.startCommitWithTime(newCommitTime);
+
+    // Test delete last record
+    keysForDelete = new ArrayList(Arrays.asList(records.get(1).getKey()));
+    writeClient.delete(keysForDelete, newCommitTime);
+
+    allFiles = getIncrementalFiles(partitionPath, "0", -1);
+    assertEquals(1, allFiles.length);
+
+    filePath = allFiles[0].getPath();
+    // Read the base file, check the record content
+    fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath);
+    // Check whether all records have been deleted
+    assertEquals(0, fileRecords.size());
+  }
+
   public static Map<String, Long> generateExpectedPartitionNumRecords(List<HoodieRecord> records) {
     return records.stream().map(record -> Pair.of(record.getPartitionPath(), 1))
         .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));