You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/08 23:47:37 UTC

[GitHub] [hudi] satishkotha commented on a change in pull request #2422: [WIP] [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible

satishkotha commented on a change in pull request #2422:
URL: https://github.com/apache/hudi/pull/2422#discussion_r554244115



##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -92,13 +95,36 @@
       case HoodieTimeline.SAVEPOINT_ACTION:
         // Nothing to be done here
         break;
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        HoodieReplaceCommitMetadata replaceMetadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata(

Review comment:
       I think this is in json format in active timeline. (Only archival uses avro format similar to other commits). Can you double check while adding tests? You may need similar change in CleanPlanner

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
     return earliestCommitToRetain;
   }
 
+  public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {

Review comment:
       fileSystemView#getReplacedFileGroupsBeforeOrOn looks similar (we may have to add another method 'getReplacedFileGroupsBefore' to enforce strict inequality). Maybe we can reuse code?  I can do this later on if we file code cleanup task.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
     return earliestCommitToRetain;
   }
 
+  public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
+    HoodieCleaningPolicy policy = config.getCleanerPolicy();
+    HoodieTimeline replaceTimeline = hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();
+
+    // Determine which replace commits can be cleaned.
+    Stream<HoodieInstant> cleanableReplaceCommits;
+    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+      if (!earliestInstantToRetain.isPresent()) {
+        LOG.info("Not enough instants to start cleaning replace commits");
+        return Collections.emptyMap();
+      }
+      // all replace commits, before the earliest instant we want to retain, should be eligible for deleting the
+      // replaced file groups.
+      cleanableReplaceCommits = replaceTimeline
+          .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN,
+              earliestInstantToRetain.get().getTimestamp()))
+          .getInstants();
+    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+      // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
+      // In other words, the file versions only apply to the active file groups.
+      cleanableReplaceCommits = replaceTimeline.getInstants();
+    } else {
+      throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
+    }
+
+    // merge everything and make a map full of file ids to be cleaned.
+    return cleanableReplaceCommits.map(instant -> {
+      try {
+        return TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to deserialize " + instant, e);
+      }
+    }).reduce((leftMap, rightMap) -> {
+      rightMap.forEach((partition, fileIds) -> {
+        if (!leftMap.containsKey(partition)) {
+          leftMap.put(partition, fileIds);
+        } else {
+          // duplicates should nt be possible; since replace of a file group should happen once only
+          leftMap.get(partition).addAll(fileIds);
+        }
+      });
+      return leftMap;
+    }).orElse(new HashMap<>());
+  }
+
+  public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String partitionPath, List<String> eligibleFileIds) {
+    return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)

Review comment:
       i think getAllFileGroups doesn't return replaced file groups. Looks like we may have to change name to getAllActiveFileGroups to avoid confusion. You can make getAllFileGroupsIncludingReplaced public and use it?
   We could also use getReplacedFileGroupsBeforeOrOn (or add new method mentioned above) that returns HoodieFileGroups 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
     return earliestCommitToRetain;
   }
 
+  public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
+    HoodieCleaningPolicy policy = config.getCleanerPolicy();
+    HoodieTimeline replaceTimeline = hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();
+
+    // Determine which replace commits can be cleaned.
+    Stream<HoodieInstant> cleanableReplaceCommits;
+    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+      if (!earliestInstantToRetain.isPresent()) {
+        LOG.info("Not enough instants to start cleaning replace commits");
+        return Collections.emptyMap();
+      }
+      // all replace commits, before the earliest instant we want to retain, should be eligible for deleting the
+      // replaced file groups.
+      cleanableReplaceCommits = replaceTimeline
+          .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN,
+              earliestInstantToRetain.get().getTimestamp()))
+          .getInstants();
+    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+      // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
+      // In other words, the file versions only apply to the active file groups.
+      cleanableReplaceCommits = replaceTimeline.getInstants();
+    } else {
+      throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
+    }
+
+    // merge everything and make a map full of file ids to be cleaned.
+    return cleanableReplaceCommits.map(instant -> {
+      try {
+        return TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to deserialize " + instant, e);
+      }
+    }).reduce((leftMap, rightMap) -> {
+      rightMap.forEach((partition, fileIds) -> {
+        if (!leftMap.containsKey(partition)) {
+          leftMap.put(partition, fileIds);
+        } else {
+          // duplicates should nt be possible; since replace of a file group should happen once only
+          leftMap.get(partition).addAll(fileIds);
+        }
+      });
+      return leftMap;
+    }).orElse(new HashMap<>());
+  }
+
+  public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String partitionPath, List<String> eligibleFileIds) {
+    return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)
+        .filter(fg -> eligibleFileIds.contains(fg.getFileGroupId().getFileId()))
+        .flatMap(HoodieFileGroup::getAllFileSlices)
+        .flatMap(fileSlice -> getCleanFileInfoForSlice(fileSlice).stream())
+        .collect(Collectors.toList());

Review comment:
       in line 220 we are honoring savepoint files for cleaning regular commits. But we might be missing that for replacecommits. is that fine for now?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -92,13 +95,36 @@
       case HoodieTimeline.SAVEPOINT_ACTION:
         // Nothing to be done here
         break;
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        HoodieReplaceCommitMetadata replaceMetadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata(
+            timeline.getInstantDetails(instant).get());
+        records = Option.of(convertMetadataToRecords(replaceMetadata, instant.getTimestamp()));
+        break;
       default:
         throw new HoodieException("Unknown type of action " + instant.getAction());
     }
 
     return records;
   }
 
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieReplaceCommitMetadata replaceCommitMetadata, String instantTime) {

Review comment:
       if HoodieReplaceCommitMetadata is json format, we could reuse convertMetadataToRecords(HoodieCommitMetadata, String)

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -81,9 +83,21 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
 
       context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
 
-      Map<String, List<HoodieCleanFileInfo>> cleanOps = context
-          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
-          .stream()
+      // Compute the file paths, to be cleaned in each valid file group
+      Stream<Pair<String, List<CleanFileInfo>>> cleanInfos = context.map(partitionsToClean,
+          partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)),

Review comment:
       is it possible to have one call to planner.getDeletePaths return all files to be cleaned? that seems like better abstraction at a high level to me. Not sure if there are disadvantages of separating them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org