You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/25 06:35:52 UTC
[hudi] branch master updated: [HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned dataset (#6349)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b873d8db31 [HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned dataset (#6349)
b873d8db31 is described below
commit b873d8db31892b964c06fb5a85a835768e5b38f4
Author: ChanKyeong Won <br...@gmail.com>
AuthorDate: Sun Sep 25 15:35:46 2022 +0900
[HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned dataset (#6349)
When using the repair deduplicate command with hudi-cli,
there is no way to run it on the unpartitioned dataset,
so modify the cli parameter.
Co-authored-by: Xingjun Wang <wo...@126.com>
---
.../apache/hudi/cli/commands/RepairsCommand.java | 2 +-
.../hudi/cli/integ/ITTestRepairsCommand.java | 52 ++++++++++++++++++++++
.../common/testutils/HoodieTestDataGenerator.java | 1 +
3 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index f0ff924e22..2b11e20a10 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -69,7 +69,7 @@ public class RepairsCommand {
@ShellMethod(key = "repair deduplicate",
value = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
public String deduplicate(
- @ShellOption(value = {"--duplicatedPartitionPath"}, help = "Partition Path containing the duplicates")
+ @ShellOption(value = {"--duplicatedPartitionPath"}, defaultValue = "", help = "Partition Path containing the duplicates")
final String duplicatedPartitionPath,
@ShellOption(value = {"--repairedOutputPath"}, help = "Location to place the repaired files")
final String repairedOutputPath,
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
index 5938a8ffe2..69db47136e 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
@@ -69,6 +69,7 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
private String duplicatedPartitionPath;
private String duplicatedPartitionPathWithUpdates;
private String duplicatedPartitionPathWithUpserts;
+ private String duplicatedNoPartitionPath;
private String repairedOutputPath;
private HoodieFileFormat fileFormat;
@@ -78,6 +79,7 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
duplicatedPartitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
duplicatedPartitionPathWithUpdates = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
duplicatedPartitionPathWithUpserts = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+ duplicatedNoPartitionPath = HoodieTestDataGenerator.NO_PARTITION_PATH;
repairedOutputPath = Paths.get(basePath, "tmp").toString();
HoodieCLI.conf = jsc.hadoopConfiguration();
@@ -135,6 +137,23 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords);
+ // init cow table for non-partitioned table tests
+ String cowNonPartitionedTablePath = Paths.get(basePath, "cow_table_non_partitioned").toString();
+
+ // Create cow table and connect
+ new TableCommand().createTable(
+ cowNonPartitionedTablePath, "cow_table_non_partitioned", HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+ HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);
+
+ cowNonPartitionedTable.addCommit("20160401010101")
+ .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "1", hoodieRecords1)
+ .getFileIdWithLogFile(HoodieTestDataGenerator.NO_PARTITION_PATH);
+
+ cowNonPartitionedTable.addCommit("20160401010202")
+ .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "2", dupRecords);
+
fileFormat = metaClient.getTableConfig().getBaseFileFormat();
}
@@ -232,6 +251,39 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
assertEquals(100, result.count());
}
+ /**
+ * Test case dry run deduplicate for non-partitioned dataset.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testDeduplicateNoPartitionWithInserts(HoodieTableType tableType) throws IOException {
+ String tablePath = Paths.get(basePath, "cow_table_non_partitioned").toString();
+ connectTableAndReloadMetaClient(tablePath);
+ HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+ metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+ fs.listStatus(new Path(Paths.get(tablePath, duplicatedNoPartitionPath).toString())));
+ List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ assertEquals(2, filteredStatuses.size(), "There should be 2 files.");
+
+ // Before deduplicate, all files contain 110 records
+ String[] files = filteredStatuses.toArray(new String[0]);
+ Dataset df = readFiles(files);
+ assertEquals(110, df.count());
+
+ // use default value without specifying duplicatedPartitionPath
+ String cmdStr = String.format("repair deduplicate --repairedOutputPath %s --sparkMaster %s",
+ repairedOutputPath, "local");
+ Object resultForCmd = shell.evaluate(() -> cmdStr);
+ assertTrue(ShellEvaluationResultUtil.isSuccess(resultForCmd));
+ assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, resultForCmd.toString());
+
+ // After deduplicate, there are 100 records
+ FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+ files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+ Dataset result = readFiles(files);
+ assertEquals(100, result.count());
+ }
+
/**
* Test case for real run deduplicate.
*/
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 81a1f32ca2..8614060126 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -89,6 +89,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
// with default bloom filter with 60,000 entries and 0.000000001 FPRate
public static final int BLOOM_FILTER_BYTES = 323495;
private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class);
+ public static final String NO_PARTITION_PATH = "";
public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";