You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/04 16:00:11 UTC

[hudi] 05/06: [HUDI-1312] [RFC-15] Support for metadata listing for snapshot queries through Hive/SparkSQL (#2366)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1a0579ca7df8d3ab83775d356f34f5b432b05176
Author: rmpifer <rm...@umich.edu>
AuthorDate: Tue Dec 29 13:09:55 2020 -0800

    [HUDI-1312] [RFC-15] Support for metadata listing for snapshot queries through Hive/SparkSQL (#2366)
    
    Co-authored-by: Ryan Pifer <ry...@amazon.com>
---
 .../apache/hudi/hadoop/HoodieHFileInputFormat.java | 12 ++---
 .../hudi/hadoop/HoodieParquetInputFormat.java      | 13 ++----
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  | 53 ++++++++++++++++------
 .../utils/HoodieRealtimeInputFormatUtils.java      | 20 +++++++-
 4 files changed, 68 insertions(+), 30 deletions(-)

diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
index 1747888..048402a 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -97,13 +96,10 @@ public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, ArrayW
     // process snapshot queries next.
     List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
     if (snapshotPaths.size() > 0) {
-      setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
-      FileStatus[] fileStatuses = super.listStatus(job);
-      Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
-          HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, HoodieFileFormat.HFILE.getFileExtension(),
-              tableMetaClientMap.values());
-      LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
-      for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
+      Map<HoodieTableMetaClient, List<Path>> groupedPaths =
+          HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
+      LOG.info("Found a total of " + groupedPaths.size() + " groups");
+      for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
         List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
         if (result != null) {
           returns.addAll(result);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 8b89949..d51aff0 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.hadoop;
 
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
@@ -109,13 +108,11 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     // process snapshot queries next.
     List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
     if (snapshotPaths.size() > 0) {
-      setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
-      FileStatus[] fileStatuses = super.listStatus(job);
-      Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
-          HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses,
-              HoodieFileFormat.PARQUET.getFileExtension(), tableMetaClientMap.values());
-      LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
-      for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
+      Map<HoodieTableMetaClient, List<Path>> groupedPaths =
+          HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
+      LOG.info("Found a total of " + groupedPaths.size() + " groups");
+      for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
+        HoodieTableMetaClient metaClient = entry.getKey();
         List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
         if (result != null) {
           returns.addAll(result);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 9b0fbf9..cf7da54 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
@@ -62,6 +63,11 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
 public class HoodieInputFormatUtils {
 
   // These positions have to be deterministic across all tables
@@ -391,27 +397,48 @@ public class HoodieInputFormatUtils {
     return grouped;
   }
 
+  public static Map<HoodieTableMetaClient, List<Path>> groupSnapshotPathsByMetaClient(
+          Collection<HoodieTableMetaClient> metaClientList,
+          List<Path> snapshotPaths
+  ) {
+    Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
+    metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>()));
+    for (Path path : snapshotPaths) {
+      // Find meta client associated with the input path
+      metaClientList.stream().filter(metaClient -> path.toString().contains(metaClient.getBasePath()))
+              .forEach(metaClient -> grouped.get(metaClient).add(path));
+    }
+    return grouped;
+  }
+
   /**
-   * Filters data files for a snapshot queried table.
+   * Filters data files under @param paths for a snapshot queried table.
    * @param job
-   * @param metadata
-   * @param fileStatuses
+   * @param metaClient
+   * @param paths
    * @return
    */
   public static List<FileStatus> filterFileStatusForSnapshotMode(
-      JobConf job, HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) throws IOException {
-    FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+          JobConf job, HoodieTableMetaClient metaClient, List<Path> paths) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metaClient);
     }
-    // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
-    HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-    TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
-    // filter files on the latest commit found
-    List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
-    LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
+
+    boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
+    boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
+    HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
+            useFileListingFromMetadata, verifyFileListing);
+
+    List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
+    for (Path p : paths) {
+      String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
+      List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
+      filteredBaseFiles.addAll(matched);
+    }
+
+    LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size());
     List<FileStatus> returns = new ArrayList<>();
-    for (HoodieBaseFile filteredFile : filteredFiles) {
+    for (HoodieBaseFile filteredFile : filteredBaseFiles) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
       }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index c8a0d7f..1b272ae 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
@@ -53,6 +54,11 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
@@ -63,13 +69,25 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
     // TODO(vc): Should we handle also non-hoodie splits here?
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
 
+    boolean useFileListingFromMetadata = conf.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
+    boolean verifyFileListing = conf.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
+    // Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if
+    // partition path is listed twice so file groups will already be loaded in file system
+    Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new HashMap<>();
     // for all unique split parents, obtain all delta files based on delta commit timeline,
     // grouped on file id
     List<InputSplit> rtSplits = new ArrayList<>();
     partitionsToParquetSplits.keySet().forEach(partitionPath -> {
       // for each partition path obtain the data & log file groupings, then map back to inputsplits
       HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
-      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+      if (!fsCache.containsKey(metaClient)) {
+
+        HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
+                useFileListingFromMetadata, verifyFileListing);
+        fsCache.put(metaClient, fsView);
+      }
+      HoodieTableFileSystemView fsView = fsCache.get(metaClient);
+
       String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
 
       try {