You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/15 02:34:09 UTC

[GitHub] [hudi] garyli1019 commented on a change in pull request #3067: [HUDI-1999] Refresh the base file view cache for WriteProfile

garyli1019 commented on a change in pull request #3067:
URL: https://github.com/apache/hudi/pull/3067#discussion_r651401752



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
+    return metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+        .map(org.apache.hadoop.fs.Path::new)
+        // filter out the file paths that does not exist, some files may be cleaned by
+        // the cleaner.
+        .filter(path -> {
+          try {
+            return fs.exists(path);
+          } catch (IOException e) {
+            LOG.error("Checking exists of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        }).map(path -> {
+          try {
+            return fs.getFileStatus(path);
+          } catch (IOException e) {
+            LOG.error("Get write status of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        })
+        // filter out crushed files

Review comment:
       crushed files might cause errors on the query side. How are those crushed files produced? 

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
+    return metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+        .map(org.apache.hadoop.fs.Path::new)
+        // filter out the file paths that does not exist, some files may be cleaned by
+        // the cleaner.
+        .filter(path -> {
+          try {
+            return fs.exists(path);
+          } catch (IOException e) {
+            LOG.error("Checking exists of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        }).map(path -> {
+          try {
+            return fs.getFileStatus(path);
+          } catch (IOException e) {
+            LOG.error("Get write status of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        })
+        // filter out crushed files
+        .filter(fileStatus -> fileStatus.getLen() > 0)
+        .collect(Collectors.toList());
+  }
+
+  public static HoodieCommitMetadata getCommitMetadata(

Review comment:
       ditto

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {

Review comment:
       add some comments about this static method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org