You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/25 06:35:52 UTC

[hudi] branch master updated: [HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned dataset (#6349)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b873d8db31 [HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned dataset (#6349)
b873d8db31 is described below

commit b873d8db31892b964c06fb5a85a835768e5b38f4
Author: ChanKyeong Won <br...@gmail.com>
AuthorDate: Sun Sep 25 15:35:46 2022 +0900

    [HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned dataset (#6349)
    
    When using the repair deduplicate command with hudi-cli,
    there is no way to run it on the unpartitioned dataset,
    so modify the cli parameter.
    
    Co-authored-by: Xingjun Wang <wo...@126.com>
---
 .../apache/hudi/cli/commands/RepairsCommand.java   |  2 +-
 .../hudi/cli/integ/ITTestRepairsCommand.java       | 52 ++++++++++++++++++++++
 .../common/testutils/HoodieTestDataGenerator.java  |  1 +
 3 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index f0ff924e22..2b11e20a10 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -69,7 +69,7 @@ public class RepairsCommand {
   @ShellMethod(key = "repair deduplicate",
       value = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
   public String deduplicate(
-      @ShellOption(value = {"--duplicatedPartitionPath"}, help = "Partition Path containing the duplicates")
+      @ShellOption(value = {"--duplicatedPartitionPath"}, defaultValue = "", help = "Partition Path containing the duplicates")
       final String duplicatedPartitionPath,
       @ShellOption(value = {"--repairedOutputPath"}, help = "Location to place the repaired files")
       final String repairedOutputPath,
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
index 5938a8ffe2..69db47136e 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
@@ -69,6 +69,7 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
   private String duplicatedPartitionPath;
   private String duplicatedPartitionPathWithUpdates;
   private String duplicatedPartitionPathWithUpserts;
+  private String duplicatedNoPartitionPath;
   private String repairedOutputPath;
 
   private HoodieFileFormat fileFormat;
@@ -78,6 +79,7 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
     duplicatedPartitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
     duplicatedPartitionPathWithUpdates = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
     duplicatedPartitionPathWithUpserts = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    duplicatedNoPartitionPath = HoodieTestDataGenerator.NO_PARTITION_PATH;
     repairedOutputPath = Paths.get(basePath, "tmp").toString();
 
     HoodieCLI.conf = jsc.hadoopConfiguration();
@@ -135,6 +137,23 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
         .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
         .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords);
 
+    // init cow table for non-partitioned table tests
+    String cowNonPartitionedTablePath = Paths.get(basePath, "cow_table_non_partitioned").toString();
+
+    // Create cow table and connect
+    new TableCommand().createTable(
+        cowNonPartitionedTablePath, "cow_table_non_partitioned", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);
+
+    cowNonPartitionedTable.addCommit("20160401010101")
+        .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "1", hoodieRecords1)
+        .getFileIdWithLogFile(HoodieTestDataGenerator.NO_PARTITION_PATH);
+
+    cowNonPartitionedTable.addCommit("20160401010202")
+        .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "2", dupRecords);
+
     fileFormat = metaClient.getTableConfig().getBaseFileFormat();
   }
 
@@ -232,6 +251,39 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
     assertEquals(100, result.count());
   }
 
+  /**
+   * Test case dry run deduplicate for non-partitioned dataset.
+   */
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  public void testDeduplicateNoPartitionWithInserts(HoodieTableType tableType) throws IOException {
+    String tablePath = Paths.get(basePath, "cow_table_non_partitioned").toString();
+    connectTableAndReloadMetaClient(tablePath);
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(Paths.get(tablePath, duplicatedNoPartitionPath).toString())));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+    assertEquals(2, filteredStatuses.size(), "There should be 2 files.");
+
+    // Before deduplicate, all files contain 110 records
+    String[] files = filteredStatuses.toArray(new String[0]);
+    Dataset df = readFiles(files);
+    assertEquals(110, df.count());
+
+    // use default value without specifying duplicatedPartitionPath
+    String cmdStr = String.format("repair deduplicate --repairedOutputPath %s --sparkMaster %s",
+        repairedOutputPath, "local");
+    Object resultForCmd = shell.evaluate(() -> cmdStr);
+    assertTrue(ShellEvaluationResultUtil.isSuccess(resultForCmd));
+    assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, resultForCmd.toString());
+
+    // After deduplicate, there are 100 records
+    FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+    files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+    Dataset result = readFiles(files);
+    assertEquals(100, result.count());
+  }
+
   /**
    * Test case for real run deduplicate.
    */
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 81a1f32ca2..8614060126 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -89,6 +89,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
   // with default bloom filter with 60,000 entries and 0.000000001 FPRate
   public static final int BLOOM_FILTER_BYTES = 323495;
   private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class);
+  public static final String NO_PARTITION_PATH = "";
   public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
   public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
   public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";