You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/26 03:21:45 UTC
[hudi] branch master updated: [HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4e42ed5eae [HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676)
4e42ed5eae is described below
commit 4e42ed5eae36f706ec35e5d09d8dc206b7fea130
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu May 26 11:21:39 2022 +0800
[HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676)
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 46 ++++++++++++++--------
1 file changed, 30 insertions(+), 16 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index f111bb70ef..c53554d8e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -506,13 +506,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction())));
if (instantsToStream != null) {
- // sorts the instants in natural order to make sure the metadata files be removed
- // in HoodieInstant.State sequence: requested -> inflight -> completed,
- // this is important because when a COMPLETED metadata file is removed first,
- // other monitors on the timeline(such as the compaction or clustering services) would
- // mistakenly recognize the pending file as a pending operation,
- // then all kinds of weird bugs occur.
- return instantsToStream.stream().sorted();
+ return instantsToStream.stream();
} else {
// if a concurrent writer archived the instant
return Stream.empty();
@@ -522,19 +516,29 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants);
- boolean success = true;
- List<String> instantFiles = archivedInstants.stream().map(archivedInstant ->
- new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
- ).map(Path::toString).collect(Collectors.toList());
- context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
- Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);
+ List<String> pendingInstantFiles = new ArrayList<>();
+ List<String> completedInstantFiles = new ArrayList<>();
- for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
- LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
- success &= result.getValue();
+ for (HoodieInstant instant : archivedInstants) {
+ String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString();
+ if (instant.isCompleted()) {
+ completedInstantFiles.add(filePath);
+ } else {
+ pendingInstantFiles.add(filePath);
+ }
}
+ context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
+ // Delete the metadata files
+ // in HoodieInstant.State sequence: requested -> inflight -> completed,
+ // this is important because when a COMPLETED metadata file is removed first,
+ // other monitors on the timeline(such as the compaction or clustering services) would
+ // mistakenly recognize the pending file as a pending operation,
+ // then all kinds of weird bugs occur.
+ boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles);
+ success &= deleteArchivedInstantFiles(context, success, completedInstantFiles);
+
// Remove older meta-data from auxiliary path too
Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
@@ -545,6 +549,16 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
return success;
}
+ private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) {
+ Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false);
+
+ for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
+ LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
+ success &= result.getValue();
+ }
+ return success;
+ }
+
/**
* Remove older instants from auxiliary meta folder.
*