You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/21 21:41:11 UTC
[hudi] branch master updated: [HUDI-4792] Batch clean files to delete (#6580)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cbf9b83ca6 [HUDI-4792] Batch clean files to delete (#6580)
cbf9b83ca6 is described below
commit cbf9b83ca6d3dada14eea551a5bae25144ca0459
Author: Nicolas Paris <ni...@riseup.net>
AuthorDate: Wed Sep 21 23:41:03 2022 +0200
[HUDI-4792] Batch clean files to delete (#6580)
This patch makes use of batch call to get fileGroup to delete during cleaning instead of 1 call per partition.
This limit the number of call to the view and should fix the trouble with metadata table in context of lot of partitions.
Fixes issue #6373
Co-authored-by: sivabalan <n....@gmail.com>
---
.../action/clean/CleanPlanActionExecutor.java | 11 +-
.../hudi/table/action/clean/CleanPlanner.java | 237 +++++++++++----------
...dieSparkCopyOnWriteTableArchiveWithReplace.java | 4 +-
.../table/view/AbstractTableFileSystemView.java | 16 +-
.../table/view/PriorityBasedFileSystemView.java | 5 +
.../view/RemoteHoodieTableFileSystemView.java | 12 ++
.../common/table/view/TableFileSystemView.java | 14 +-
7 files changed, 176 insertions(+), 123 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 7f3b437178..bd7ec798ed 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -42,6 +42,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -116,9 +117,15 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
- .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
+ .parallelize(partitionsToClean, cleanerParallelism)
+ .mapPartitions(partitionIterator -> {
+ List<String> partitionList = new ArrayList<>();
+ partitionIterator.forEachRemaining(partitionList::add);
+ Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
+ return cleanResult.entrySet().iterator();
+ }, false).collectAsList()
.stream()
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 671a522cab..5ed53f7ae0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -60,6 +60,7 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -225,10 +226,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
* policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
* single file (i.e run it with versionsRetained = 1)
*/
- private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
- LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestVersions(List<String> partitionPaths) {
+ LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. ");
- List<CleanFileInfo> deletePaths = new ArrayList<>();
+ Map<String, Pair<Boolean, List<CleanFileInfo>>> map = new HashMap<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
@@ -236,43 +237,48 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
- deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
- boolean toDeletePartition = false;
- List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
- for (HoodieFileGroup fileGroup : fileGroups) {
- int keepVersions = config.getCleanerFileVersionsRetained();
- // do not cleanup slice required for pending compaction
- Iterator<FileSlice> fileSliceIterator =
- fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
- if (isFileGroupInPendingCompaction(fileGroup)) {
- // We have already saved the last version of file-groups for pending compaction Id
- keepVersions--;
- }
+ List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
+ for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
+ List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty()));
+ boolean toDeletePartition = false;
+ for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
+ int keepVersions = config.getCleanerFileVersionsRetained();
+ // do not cleanup slice required for pending compaction
+ Iterator<FileSlice> fileSliceIterator =
+ fileGroup.getAllFileSlices()
+ .filter(fs -> !isFileSliceNeededForPendingCompaction(fs))
+ .iterator();
+ if (isFileGroupInPendingCompaction(fileGroup)) {
+ // We have already saved the last version of file-groups for pending compaction Id
+ keepVersions--;
+ }
- while (fileSliceIterator.hasNext() && keepVersions > 0) {
- // Skip this most recent version
- fileSliceIterator.next();
- keepVersions--;
- }
- // Delete the remaining files
- while (fileSliceIterator.hasNext()) {
- FileSlice nextSlice = fileSliceIterator.next();
- Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
- if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
- // do not clean up a savepoint data file
- continue;
+ while (fileSliceIterator.hasNext() && keepVersions > 0) {
+ // Skip this most recent version
+ fileSliceIterator.next();
+ keepVersions--;
+ }
+ // Delete the remaining files
+ while (fileSliceIterator.hasNext()) {
+ FileSlice nextSlice = fileSliceIterator.next();
+ Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
+ if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
+ // do not clean up a savepoint data file
+ continue;
+ }
+ deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
- deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
+ // if there are no valid file groups for the partition, mark it to be deleted
+ if (partitionFileGroupList.getValue().isEmpty()) {
+ toDeletePartition = true;
+ }
+ map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
}
- // if there are no valid file groups for the partition, mark it to be deleted
- if (fileGroups.isEmpty()) {
- toDeletePartition = true;
- }
- return Pair.of(toDeletePartition, deletePaths);
+ return map;
}
- private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) {
+ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPath) {
return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
}
@@ -293,9 +299,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
* @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted,
* and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted.
*/
- private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
- LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
- List<CleanFileInfo> deletePaths = new ArrayList<>();
+ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) {
+ LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. ");
+ Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanFileInfoPerPartitionMap = new HashMap<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
@@ -307,86 +313,90 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
if (commitTimeline.countInstants() > commitsRetained) {
Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
- // all replaced file groups before earliestCommitToRetain are eligible to clean
- deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
// add active files
- List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
- for (HoodieFileGroup fileGroup : fileGroups) {
- List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
-
- if (fileSliceList.isEmpty()) {
- continue;
- }
-
- String lastVersion = fileSliceList.get(0).getBaseInstantTime();
- String lastVersionBeforeEarliestCommitToRetain =
- getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
-
- // Ensure there are more than 1 version of the file (we only clean old files from updates)
- // i.e always spare the last commit.
- for (FileSlice aSlice : fileSliceList) {
- Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
- String fileCommitTime = aSlice.getBaseInstantTime();
- if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
- // do not clean up a savepoint data file
+ List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
+ for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
+ List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
+ // all replaced file groups before earliestCommitToRetain are eligible to clean
+ deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
+ for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
+ List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
+
+ if (fileSliceList.isEmpty()) {
continue;
}
- if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
- // Dont delete the latest commit and also the last commit before the earliest commit we
- // are retaining
- // The window of commit retain == max query run time. So a query could be running which
- // still
- // uses this file.
- if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
- // move on to the next file
- continue;
- }
- } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
- // This block corresponds to KEEP_LATEST_BY_HOURS policy
- // Do not delete the latest commit.
- if (fileCommitTime.equals(lastVersion)) {
- // move on to the next file
+ String lastVersion = fileSliceList.get(0).getBaseInstantTime();
+ String lastVersionBeforeEarliestCommitToRetain =
+ getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
+
+ // Ensure there are more than 1 version of the file (we only clean old files from updates)
+ // i.e always spare the last commit.
+ for (FileSlice aSlice : fileSliceList) {
+ Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
+ String fileCommitTime = aSlice.getBaseInstantTime();
+ if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
+ // do not clean up a savepoint data file
continue;
}
- }
- // Always keep the last commit
- if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
- .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
- // this is a commit, that should be cleaned.
- aFile.ifPresent(hoodieDataFile -> {
- deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
- if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
- deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
+ if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+ // Dont delete the latest commit and also the last commit before the earliest commit we
+ // are retaining
+ // The window of commit retain == max query run time. So a query could be running which
+ // still
+ // uses this file.
+ if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
+ // move on to the next file
+ continue;
+ }
+ } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+ // This block corresponds to KEEP_LATEST_BY_HOURS policy
+ // Do not delete the latest commit.
+ if (fileCommitTime.equals(lastVersion)) {
+ // move on to the next file
+ continue;
}
- });
- if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
- // If merge on read, then clean the log files for the commits as well
- Predicate<HoodieLogFile> notCDCLogFile =
- hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
- deletePaths.addAll(
- aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
- .collect(Collectors.toList()));
}
- if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
- // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow.
- // Here we need to clean uo these cdc log files.
- Predicate<HoodieLogFile> isCDCLogFile =
- hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
- deletePaths.addAll(
- aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
- .collect(Collectors.toList()));
+
+ // Always keep the last commit
+ if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
+ .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
+ // this is a commit, that should be cleaned.
+ aFile.ifPresent(hoodieDataFile -> {
+ deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
+ if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
+ deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
+ }
+ });
+ if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+ // If merge on read, then clean the log files for the commits as well
+ Predicate<HoodieLogFile> notCDCLogFile =
+ hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+ deletePaths.addAll(
+ aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+ .collect(Collectors.toList()));
+ }
+ if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+ // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow.
+ // Here we need to clean uo these cdc log files.
+ Predicate<HoodieLogFile> isCDCLogFile =
+ hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+ deletePaths.addAll(
+ aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+ .collect(Collectors.toList()));
+ }
}
}
}
- }
- // if there are no valid file groups for the partition, mark it to be deleted
- if (fileGroups.isEmpty()) {
- toDeletePartition = true;
+ // if there are no valid file groups for the partition, mark it to be deleted
+ if (partitionFileGroupList.getValue().isEmpty()) {
+ toDeletePartition = true;
+ }
+ cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
}
}
- return Pair.of(toDeletePartition, deletePaths);
+ return cleanFileInfoPerPartitionMap;
}
/**
@@ -394,10 +404,11 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
* all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained.
* This policy gives much more flexibility to users for retaining data for running incremental queries as compared to
* KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
+ *
* @param partitionPath partition path to check
* @return list of files to clean
*/
- private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) {
+ private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestHours(List<String> partitionPath) {
return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
}
@@ -463,21 +474,23 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
/**
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/
- public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) {
+ public Map<String, Pair<Boolean, List<CleanFileInfo>>> getDeletePaths(List<String> partitionPaths) {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
- Pair<Boolean, List<CleanFileInfo>> deletePaths;
+ Map<String, Pair<Boolean, List<CleanFileInfo>>> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
- deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
+ deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
- deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
+ deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
- deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
+ deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths);
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}
- LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
- if (deletePaths.getKey()) {
- LOG.info("Partition " + partitionPath + " to be deleted");
+ for (String partitionPath : deletePaths.keySet()) {
+ LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath);
+ if (deletePaths.get(partitionPath).getLeft()) {
+ LOG.info("Partition " + partitionPath + " to be deleted");
+ }
}
return deletePaths;
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index baff4ebac8..967e313f4e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -57,7 +57,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
HoodieWriteConfig writeConfig = getConfigBuilder(true)
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
@@ -81,7 +81,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
- // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
+ // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
for (int i = 5; i < 9; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 8cfd92d01f..89a184bf49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -116,7 +116,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
/**
* Refresh commits timeline.
- *
+ *
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -750,6 +750,20 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}
+ @Override
+ public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
+ return getAllFileGroupsIncludingReplaced(partitionPaths)
+ .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList())));
+ }
+
+ private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
+ List<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
+ for (String partitionStr : partitionStrList) {
+ fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList())));
+ }
+ return fileGroupPerPartitionList.stream();
+ }
+
private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
try {
readLock.lock();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index ff44c7cef0..9006bd45cb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -204,6 +204,11 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}
+ @Override
+ public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
+ return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
+ }
+
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index bd18ba22a2..5e52767fe2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -51,9 +51,11 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -377,6 +379,16 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
}
}
+ @Override
+ public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
+ ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
+ for (String partitionPath : partitionPaths) {
+ Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
+ fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
+ }
+ return fileGroupPerPartitionList.stream();
+ }
+
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index c32e2cabb1..9c83c8f19c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -109,18 +109,18 @@ public interface TableFileSystemView {
/**
* Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
*
- * @param partitionPath Partition path
- * @param maxCommitTime Max Instant Time
+ * @param partitionPath Partition path
+ * @param maxCommitTime Max Instant Time
* @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
- boolean includeFileSlicesInPendingCompaction);
+ boolean includeFileSlicesInPendingCompaction);
/**
* Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
* file-slice before and after compaction request instant is merged and returned.
- *
- * @param partitionPath Partition Path
+ *
+ * @param partitionPath Partition Path
* @param maxInstantTime Max Instant Time
* @return
*/
@@ -149,10 +149,12 @@ public interface TableFileSystemView {
*/
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
+ Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);
+
/**
* Return Pending Compaction Operations.
*
- * @return Pair<Pair<InstantTime,CompactionOperation>>
+ * @return Pair<Pair < InstantTime, CompactionOperation>>
*/
Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();