You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/29 21:26:18 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4716: [HUDI-3322][HUDI-3343] Fixing Metadata Table Records Duplication Issues

nsivabalan commented on a change in pull request #4716:
URL: https://github.com/apache/hudi/pull/4716#discussion_r795093095



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -90,42 +86,41 @@ public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineCo
                 Collections.singletonList(fullDeletePath.toString()),
                 Collections.emptyMap());
           case APPEND:
+            // NOTE: This marker file-path does NOT correspond to a log-file, but rather is a phony
+            //       path serving as a "container" for the following components:
+            //          - Base file's file-id
+            //          - Base file's commit instant
+            //          - Partition path
             return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
           default:
             throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
         }
-      }, parallelism).stream().collect(Collectors.toList());
+      }, parallelism);
     } catch (Exception e) {
       throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePath) throws IOException {
+    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-    Map<String, Long> writtenLogFileStrSizeMap = new HashMap<>();
-    for (Map.Entry<FileStatus, Long> entry : writtenLogFileSizeMap.entrySet()) {
-      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue());
-    }
-    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap);
+    String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), baseFilePathForAppend.getParent());
+    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), relativePartitionPath);
+
+    // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its
+    //       block to the latest log-file
+    // TODO(HUDI-1517) use provided marker-file's path instead
+    HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
+        HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get();
+
+    // NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended,
+    //       therefore we simply stub this value.
+    Map<String, Long> logFilesWithBlocsToRollback =

Review comment:
       I was also thinking if we should fix the way we store the file size in HoodieMetadataPayload.
   ```
    HoodieMetadataFileInfo(long size, boolean isFullSize, boolean isDelete)
   ```
   With CommitMetadata, these will represent delta sizes. and during rollback/restore, these will represent full sizes. We can combine multiple metadata records based on whether its delta or full size. 
   

##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -38,14 +38,6 @@
                     "type": "long",
                     "doc": "Size of this file in bytes"
                 }
-            }], "default":null },

Review comment:
       this may not be backwards compatible while reading rollback metadata written w/ 0.10.0 or previous versions. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -90,42 +86,41 @@ public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineCo
                 Collections.singletonList(fullDeletePath.toString()),
                 Collections.emptyMap());
           case APPEND:
+            // NOTE: This marker file-path does NOT correspond to a log-file, but rather is a phony
+            //       path serving as a "container" for the following components:
+            //          - Base file's file-id
+            //          - Base file's commit instant
+            //          - Partition path
             return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
           default:
             throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
         }
-      }, parallelism).stream().collect(Collectors.toList());
+      }, parallelism);
     } catch (Exception e) {
       throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePath) throws IOException {
+    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-    Map<String, Long> writtenLogFileStrSizeMap = new HashMap<>();
-    for (Map.Entry<FileStatus, Long> entry : writtenLogFileSizeMap.entrySet()) {
-      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue());
-    }
-    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap);
+    String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), baseFilePathForAppend.getParent());
+    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), relativePartitionPath);
+
+    // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its
+    //       block to the latest log-file
+    // TODO(HUDI-1517) use provided marker-file's path instead
+    HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,

Review comment:
       probably we should fix marker file even for log files. If not, we have to read all log files and then determine based on the instant time in the header to know exactly which log files were touched by a given commit

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -90,42 +86,41 @@ public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineCo
                 Collections.singletonList(fullDeletePath.toString()),
                 Collections.emptyMap());
           case APPEND:
+            // NOTE: This marker file-path does NOT correspond to a log-file, but rather is a phony
+            //       path serving as a "container" for the following components:
+            //          - Base file's file-id
+            //          - Base file's commit instant
+            //          - Partition path
             return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
           default:
             throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
         }
-      }, parallelism).stream().collect(Collectors.toList());
+      }, parallelism);
     } catch (Exception e) {
       throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePath) throws IOException {
+    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-    Map<String, Long> writtenLogFileStrSizeMap = new HashMap<>();
-    for (Map.Entry<FileStatus, Long> entry : writtenLogFileSizeMap.entrySet()) {
-      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue());
-    }
-    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap);
+    String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), baseFilePathForAppend.getParent());
+    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), relativePartitionPath);
+
+    // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its
+    //       block to the latest log-file
+    // TODO(HUDI-1517) use provided marker-file's path instead
+    HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,

Review comment:
       incase of multi-writer use-cases, this may not be true unfortunately. one writer could fail mid way. there could be other concurrent writers. and after a timeout, some other writer will trigger the rollback of the previously failed commit. So, by this time, there could be more log blocks/files added. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -90,42 +86,41 @@ public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineCo
                 Collections.singletonList(fullDeletePath.toString()),
                 Collections.emptyMap());
           case APPEND:
+            // NOTE: This marker file-path does NOT correspond to a log-file, but rather is a phony
+            //       path serving as a "container" for the following components:
+            //          - Base file's file-id
+            //          - Base file's commit instant
+            //          - Partition path
             return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
           default:
             throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
         }
-      }, parallelism).stream().collect(Collectors.toList());
+      }, parallelism);
     } catch (Exception e) {
       throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePath) throws IOException {
+    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-    Map<String, Long> writtenLogFileStrSizeMap = new HashMap<>();
-    for (Map.Entry<FileStatus, Long> entry : writtenLogFileSizeMap.entrySet()) {
-      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue());
-    }
-    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap);
+    String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), baseFilePathForAppend.getParent());
+    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), relativePartitionPath);
+
+    // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its
+    //       block to the latest log-file
+    // TODO(HUDI-1517) use provided marker-file's path instead
+    HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
+        HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get();
+
+    // NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended,
+    //       therefore we simply stub this value.
+    Map<String, Long> logFilesWithBlocsToRollback =

Review comment:
       but how do we reconcile this -1 while merging multiple metadata records? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org