You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/09 03:52:22 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #2421: [Hudi-1502] MOR rollback and restore support for metadata sync

vinothchandar commented on a change in pull request #2421:
URL: https://github.com/apache/hudi/pull/2421#discussion_r554264580



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -164,6 +164,7 @@ private void init(HoodieRecord record) {
         // Since the actual log file written to can be different based on when rollover happens, we use the
         // base file to denote some log appends happened on a slice. writeToken will still fence concurrent
         // writers.
+        // TODO? are these sufficient?

Review comment:
       lets answer this and either resolve and fix

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -129,9 +131,23 @@ protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant
           1L);
     }
 
-    return HoodieRollbackStat.newBuilder()
+    HoodieRollbackStat.Builder builder = HoodieRollbackStat.newBuilder()
         .withPartitionPath(partitionPath)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .build();
+        .withRollbackBlockAppendResults(filesToNumBlocksRollback);
+    if (probableLogFileMap != null) {
+      builder.withProbableLogFileToSizeMap(probableLogFileMap);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Returns probable log files for the respective baseCommitTime to assist in metadata table syncing.
+   * @param partitionPath partition path of interest
+   * @param baseCommitTime base commit time of interest
+   * @return Map<FileStatus, File size>
+   * @throws IOException
+   */
+  protected Map<FileStatus, Long> getProbableFileSizeMap(String partitionPath, String baseCommitTime) throws IOException {

Review comment:
       why probable? is there a chance that this can be wrong?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -129,9 +130,23 @@ protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant
           1L);
     }
 
-    return HoodieRollbackStat.newBuilder()
+    HoodieRollbackStat.Builder builder = HoodieRollbackStat.newBuilder()
         .withPartitionPath(partitionPath)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .build();
+        .withRollbackBlockAppendResults(filesToNumBlocksRollback);
+    if (probableLogFileMap != null) {
+      builder.withProbableLogFileToSizeMap(probableLogFileMap);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Returns probable log files for the respective baseCommitTime to assist in metadata table syncing.
+   * @param partitionPath partition path of interest
+   * @param baseCommitTime base commit time of interest
+   * @return Map<FileStatus, File size>
+   * @throws IOException
+   */
+  protected Map<FileStatus, Long> getProbableFileSizeMap(String partitionPath, String baseCommitTime) throws IOException {
+    return null;

Review comment:
       Collections.EMPTY_MAP

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -794,7 +788,19 @@ private void validateMetadata(SparkRDDWriteClient client) throws IOException {
         if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
           LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
           LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
+
+          for (String fileName : fsFileNames) {
+            if (!metadataFilenames.contains(fileName)) {
+              LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");

Review comment:
       debug code?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -225,25 +216,23 @@
   /**
    * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
    *
-   * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
-   * function will extract this change file for each partition.
+   * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This function will extract this change file for each partition.
    *
    * @param rollbackMetadata {@code HoodieRollbackMetadata}
    * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
    * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
    */
   private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
-                                              Map<String, List<String>> partitionToDeletedFiles,
-                                              Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                              Option<String> lastSyncTs) {
+      Map<String, List<String>> partitionToDeletedFiles,
+      Map<String, Map<String, Long>> partitionToAppendedFiles,
+      Option<String> lastSyncTs) {
 
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
       // Has this rollback produced new files?
-      boolean hasAppendFiles = pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+      boolean hasAppendFiles = pm.getRollbackLogFiles() != null ? pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0 : false;

Review comment:
       lets have one boolean for each. and can we deal with empty maps instead of nulls.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -98,7 +98,7 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteC
    * @return stats collected with or w/o actual deletions.
    */
   JavaPairRDD<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests,
-                                                                     int sparkPartitions, boolean doDelete) {
+      int sparkPartitions, boolean doDelete) {

Review comment:
       revert this?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
##########
@@ -129,9 +130,23 @@ protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant
           1L);
     }
 
-    return HoodieRollbackStat.newBuilder()
+    HoodieRollbackStat.Builder builder = HoodieRollbackStat.newBuilder()
         .withPartitionPath(partitionPath)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .build();
+        .withRollbackBlockAppendResults(filesToNumBlocksRollback);
+    if (probableLogFileMap != null) {

Review comment:
       we should return an empty map from the method, instead of relying on null. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
##########
@@ -75,4 +84,23 @@ public SparkMarkerBasedRollbackStrategy(HoodieTable<T, JavaRDD<HoodieRecord<T>>,
       throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
     }
   }
+
+  protected Map<FileStatus, Long> getProbableFileSizeMap(String partitionPath, String baseCommitTime) throws IOException {
+    // collect all log files that is supposed to be deleted with this rollback

Review comment:
       this is standard code already available. Please see the other comment

##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -31,18 +31,22 @@
             {"name": "partitionPath", "type": "string"},
             {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
             {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}},
-            {"name": "appendFiles", "type": {
+            {"name": "rollbackLogFiles", "type": {
                 "type": "map",
-                "doc": "Files to which append blocks were written",
                 "values": {
                     "type": "long",
                     "doc": "Size of this file in bytes"
                 }
+            }},
+            {"name": "writtenLogFiles", "type": {

Review comment:
       why do we call this `writtenLogFiles` and elsewhere we just call this `probable...`? 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +116,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteC
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          // collect all log files that is supposed to be deleted with this rollback
+          String baseCommit = rollbackRequest.getLatestBaseInstant().get();

Review comment:
       `baseInstantTime`

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -225,25 +217,23 @@
   /**
    * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
    *
-   * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
-   * function will extract this change file for each partition.
+   * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This function will extract this change file for each partition.
    *
    * @param rollbackMetadata {@code HoodieRollbackMetadata}
    * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
    * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
    */
   private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
-                                              Map<String, List<String>> partitionToDeletedFiles,
-                                              Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                              Option<String> lastSyncTs) {
+      Map<String, List<String>> partitionToDeletedFiles,

Review comment:
       can we revert these unncessary formatting changes?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +117,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteC
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          // collect all log files that is supposed to be deleted with this rollback
+          String baseCommit = rollbackRequest.getLatestBaseInstant().get();
+          SerializablePathFilter filter = (path) -> {
+            if (FSUtils.isLogFile(path)) {
+              // Since the baseCommitTime is the only commit for new log files, it's okay here
+              String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+              return baseCommit.equals(fileCommitTime);
+            }
+            return false;
+          };
+
+          final Map<FileStatus, Long> probableLogFileMap = new HashMap<>();
+          FileSystem fs = metaClient.getFs();
+          FileStatus[] probableLogFiles = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), filter);

Review comment:
       Guess, there is no way to avoid this listing per se. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -102,8 +102,6 @@
   /**
    * Finds all new files/partitions created as part of commit and creates metadata table records for them.
    *
-   * @param commitMetadata

Review comment:
       why these changes?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
##########
@@ -70,18 +70,20 @@ public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbac
     Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = new HashMap<>();
     int totalDeleted = 0;
     for (HoodieRollbackStat stat : rollbackStats) {
-      Map<String, Long> appendFiles = stat.getCommandBlocksCount().keySet().stream()
+      Map<String, Long> rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream()
+          .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
+      Map<String, Long> probableLogFiles = stat.getProbableLogFileToSizeMap().keySet().stream()

Review comment:
       lets stick to one consistent term please

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -262,13 +251,26 @@ private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
         partitionToDeletedFiles.get(partition).addAll(deletedFiles);
       }
 
-      if (!pm.getAppendFiles().isEmpty()) {
+      if (pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty()) {
         if (!partitionToAppendedFiles.containsKey(partition)) {
           partitionToAppendedFiles.put(partition, new HashMap<>());
         }
 
         // Extract appended file name from the absolute paths saved in getAppendFiles()
-        pm.getAppendFiles().forEach((path, size) -> {
+        pm.getRollbackLogFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+            return size + oldSize;

Review comment:
       this size needs to be updated right? is it correct to add this?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
##########
@@ -56,9 +56,9 @@
   private static final Integer DEFAULT_VERSION = 1;
 
   public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime,
-                                                             long durationInMs,
-                                                             List<HoodieInstant> instants,
-                                                             Map<String, List<HoodieRollbackMetadata>> instantToRollbackMetadata) {
+      long durationInMs,

Review comment:
       revert

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -164,6 +164,7 @@ private void init(HoodieRecord record) {
         // Since the actual log file written to can be different based on when rollover happens, we use the
         // base file to denote some log appends happened on a slice. writeToken will still fence concurrent
         // writers.
+        // TODO? are these sufficient?

Review comment:
       the cleanest way to have a marker per file created. That way we can avoid listing anything at all. We can just compute the files from the marker folder directly

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +116,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteC
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          // collect all log files that is supposed to be deleted with this rollback
+          String baseCommit = rollbackRequest.getLatestBaseInstant().get();

Review comment:
       all of this code is pretty much replaced by FSUtils.getAllLogFiles()?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
##########
@@ -70,18 +70,20 @@ public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbac
     Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = new HashMap<>();
     int totalDeleted = 0;
     for (HoodieRollbackStat stat : rollbackStats) {
-      Map<String, Long> appendFiles = stat.getCommandBlocksCount().keySet().stream()
+      Map<String, Long> rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream()
+          .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
+      Map<String, Long> probableLogFiles = stat.getProbableLogFileToSizeMap().keySet().stream()

Review comment:
       I think you want to capture the fact that this is all the log files. lets just call it `allLogFiles` then




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org