You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/11 19:16:30 UTC

[GitHub] [hudi] bvaradar commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

bvaradar commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453225662



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
+    // is this an incremental query
+    List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+      Integer maxCommits = HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() + ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism ?
+    HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new HashMap<>();
+    for (HoodieInstant commit: commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+        }
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            //TODO Should the length of file be totalWriteBytes or fileSizeInBytes?

Review comment:
       @bhasudha : totalWriteBytes = fileSizeInBytes for base files (parquet). For log files, this is not the case as we use a heuristic to estimated the bytes written per delta-commit. 
   
   Getting the actual file size would require a RPC call and will be costly here. Also, looking at where the file size is useful in read-path, it is only needed for combining and splitting file-splits which is based on base-file for Realtime. So, it should be fine to use the metadata stat stat.getTotalWriteBytes() with a change.
   
   As same log file can appear in multiple delta commits (for HDFS and other file-systems supporting appends), the logic below needs to handle that, You can simply have the cumulative write-bytes for each log file appearing across delta-commits to get a better approximate size of log files. 
   

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
+    // is this an incremental query
+    List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+      Integer maxCommits = HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() + ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism ?
+    HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new HashMap<>();
+    for (HoodieInstant commit: commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+        }
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            //TODO Should the length of file be totalWriteBytes or fileSizeInBytes?
+            FileStatus fs = new FileStatus(stat.getTotalWriteBytes(), false, 0, 0,
+                0, fullPath);
+            partitionToFileStatusesMap.get(entry.getKey()).add(fs);

Review comment:
       need to handle duplicate log files here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org