You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/26 03:21:45 UTC

[hudi] branch master updated: [HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e42ed5eae [HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676)
4e42ed5eae is described below

commit 4e42ed5eae36f706ec35e5d09d8dc206b7fea130
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu May 26 11:21:39 2022 +0800

    [HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 46 ++++++++++++++--------
 1 file changed, 30 insertions(+), 16 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index f111bb70ef..c53554d8e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -506,13 +506,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
       List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
                 HoodieInstant.getComparableAction(hoodieInstant.getAction())));
       if (instantsToStream != null) {
-        // sorts the instants in natural order to make sure the metadata files be removed
-        // in HoodieInstant.State sequence: requested -> inflight -> completed,
-        // this is important because when a COMPLETED metadata file is removed first,
-        // other monitors on the timeline(such as the compaction or clustering services) would
-        // mistakenly recognize the pending file as a pending operation,
-        // then all kinds of weird bugs occur.
-        return instantsToStream.stream().sorted();
+        return instantsToStream.stream();
       } else {
         // if a concurrent writer archived the instant
         return Stream.empty();
@@ -522,19 +516,29 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
 
   private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
     LOG.info("Deleting instants " + archivedInstants);
-    boolean success = true;
-    List<String> instantFiles = archivedInstants.stream().map(archivedInstant ->
-        new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
-    ).map(Path::toString).collect(Collectors.toList());
 
-    context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
-    Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);
+    List<String> pendingInstantFiles = new ArrayList<>();
+    List<String> completedInstantFiles = new ArrayList<>();
 
-    for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
-      LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
-      success &= result.getValue();
+    for (HoodieInstant instant : archivedInstants) {
+      String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString();
+      if (instant.isCompleted()) {
+        completedInstantFiles.add(filePath);
+      } else {
+        pendingInstantFiles.add(filePath);
+      }
     }
 
+    context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
+    // Delete the metadata files
+    // in HoodieInstant.State sequence: requested -> inflight -> completed,
+    // this is important because when a COMPLETED metadata file is removed first,
+    // other monitors on the timeline(such as the compaction or clustering services) would
+    // mistakenly recognize the pending file as a pending operation,
+    // then all kinds of weird bugs occur.
+    boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles);
+    success &= deleteArchivedInstantFiles(context, success, completedInstantFiles);
+
     // Remove older meta-data from auxiliary path too
     Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
         || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
@@ -545,6 +549,16 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
     return success;
   }
 
+  private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) {
+    Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false);
+
+    for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
+      LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
+      success &= result.getValue();
+    }
+    return success;
+  }
+
   /**
    * Remove older instants from auxiliary meta folder.
    *