You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/11 00:21:34 UTC

[GitHub] [hudi] bhasudha opened a new pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

bhasudha opened a new pull request #1817:
URL: https://github.com/apache/hudi/pull/1817


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   This commit addresses two issues:
         1. Honors end time if less than the most recent completed commit time
         2. Doesnt require a base parquet file to be present in case when the begin and end times match only the the deltacommits.
   
       To achieve this:
       - Created a seperate FileSplit for handling incremental queries
       - New RecordReader to handle the new FileSplit
       - FileSlice Scanner to scan files in a File Slice. First takes base parquet file (if present) and applies merged records from all log files in that slice. If base file is not present returns merged records from log files on scanning
       - HoodieParquetRealtimeInputFormat modified to switch to this HoodieMORIncrementalFileSplit and HoodieMORIncrementalRecordReader from getSplit(..) and getRecordReader(..) in case of incremental queries.
       - Includes unit test to test different incremental queries
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [x] Has a corresponding JIRA in PR title & commit
    
    - [x] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-668300865


   cc @satishkotha  to review and suggest what to do in 0.6.0 timelines


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-665979938


   yes this is trying to fix things for Hive primarily. We can take a separate approach for spark incremental queries


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-809972020


   @garyli1019 Is this PR addressed as part of https://github.com/apache/hudi/pull/1938 or should we revive this ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bhasudha commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
bhasudha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r465927669



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -165,11 +261,15 @@ private static void cleanProjectionColumnIds(Configuration conf) {
     LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
     // sanity check
-    ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit,
+    ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit || split instanceof HoodieMORIncrementalFileSplit,

Review comment:
       @satishkotha  There are few requirements we need to satisfy in order to support this in HoodieRealtimeFileSplit:
   
   - The start and end time should be honored by the incremental query. If end time is not specified then it can be assumed to be minCommit from (maxNumberrOfCommits, mostRecentCommit). Currently this is not happening as intended.  
   - The base file and log files can be optional. This can be the case when the boundaries of incremental query filter is such that the start commit time matches a log file and/or an end commit time matches only the base file across file slices. Or the incremental query is touching a FileSlice that is not compacted yet.
   
   When I initially started, I was not sure how big the refactor and testing it would be to achieve both of the above requirements in the same HoodieRealtimeFileSplit. This would also require regression testing of snapshot queries in all query engines and new incremental query path in all query engines. So instead of impacting the snapshot queries code path that is running fine, conservatively, I branched out to make these changes only applicable to incremental query path and intended to consolidate them in long term after stabilizing and gaining more confidence.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-810749728


   @garyli1019 Thanks for the context. @bhasudha Are you able to revive and rebase this PR ? Once you do that, I can help land this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bhasudha commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
bhasudha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453131365



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergedFileSlicesScanner.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP;
+import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED;
+import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE;
+import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH;
+import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP;
+import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.arrayWritableToString;
+import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.avroToArrayWritable;
+
+/**
+ * Scans through all files in a file slice. Constructs a map of records from Parquet file if present. Constructs merged
+ * log records if log files are present in the slice. Merges records from log files on top of records from base parquet
+ * file.
+ *
+ */
+public class HoodieMergedFileSlicesScanner {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieMergedFileSlicesScanner.class);
+
+  private final transient MapredParquetInputFormat mapredParquetInputFormat;
+  private final HoodieMORIncrementalFileSplit split;
+  private final List<FileSlice> fileSlices;
+
+  // Final map of compacted/merged records
+  // TODO change map to external spillable map. But ArrayWritable is not implementing Serializable

Review comment:
       We are merging parquet and log files and need to assume either of the files can be absent. I kept the Map interface as <String, ArrayWritable>. But since ArrayWritable is not implementing Serializable, that prevents the use of ExternalSpillableMap. Appreciate any ideas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-662836052


   > @garyli1019 are you talking about corner cases not handled in this PR? can you review the PR once for intended functionality? I am trying to see if this can help MOR/Incremental query on spark SQL in some form.
   
   @vinothchandar I think the Spark Datasource will use a different approach. IIUC, this PR is trying to solve when the incremental query started from an uncompacted delta commit, which doesn't have a base file for some file groups and leads to missing the log records. For Spark Datasource, we can create a `HoodieFileSplit` without `baseFile` and read logs only. I am not sure if this could be done in the `HoodieRealtimeFileSplit` and without the extra handle for `HoodieMORIncrementalFileSplit`. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bhasudha commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
bhasudha commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-656948491


   @garyli1019 Please take a look at this and provide feedback


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1817: [HUDI-651] Fix incremental queries in hive for MOR tables

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-845989221


   @garyli1019 and @n3nash : can we have some owner for this. if not, I can take it up, but I haven't worked on incremental path before, but willing to expand my knowledge :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-809978441


   > @garyli1019 Is this PR addressed as part of #1938 or should we revive this ?
   
   hi @n3nash , #1938 add support for MOR incremental query on Spark DataSource. This PR seems to aim to fix on Hive. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-813859803


   @bhasudha I will try to rebase this PR and get it to a working state tomorrow, if you are working on it already, please let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bhasudha commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
bhasudha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453131885



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
+    // is this an incremental query
+    List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot and NOT both.

Review comment:
       This is an assumption for now since we are not touching the snapshot queries and adding a new path to handle incremental queries, if incremental tables is not empty. In future this might have to change. OR may be a better way to apply this constraint instead of relying on simply the incremental tables ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r465908401



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -165,11 +261,15 @@ private static void cleanProjectionColumnIds(Configuration conf) {
     LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
     // sanity check
-    ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit,
+    ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit || split instanceof HoodieMORIncrementalFileSplit,

Review comment:
       High level question, is it possible to make baseFile optional in HoodieRealtimeFileSplit instead of creating new class HoodieMORIncrementalFileSplit? We may also have to make changes in RecordReader classes if baseFile is not present.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-662830116


   @garyli1019 are you talking about corner cases not handled in this PR? can you review the PR once for intended functionality? I am trying to see if this can help MOR/Incremental query on spark SQL in some form. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bhasudha commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
bhasudha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453132095



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
+    // is this an incremental query
+    List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+      Integer maxCommits = HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() + ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism ?
+    HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new HashMap<>();
+    for (HoodieInstant commit: commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+        }
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            //TODO Should the length of file be totalWriteBytes or fileSizeInBytes?

Review comment:
       @bvaradar  @n3nash  can you help clarify this ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bhasudha commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
bhasudha commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453132014



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
+    // is this an incremental query
+    List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+      Integer maxCommits = HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() + ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism ?

Review comment:
       There is scope for parallelized listing here. But should not be a blocker immediately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #1817: [HUDI-651] Fix incremental queries in hive for MOR tables

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-1025355020


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "304061316e3f633fef102f9f217c3e446ce14156",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "304061316e3f633fef102f9f217c3e446ce14156",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 304061316e3f633fef102f9f217c3e446ce14156 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #1817: [HUDI-651] Fix incremental queries in MOR tables

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#discussion_r453225662



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
+    // is this an incremental query
+    List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+      Integer maxCommits = HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() + ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism ?
+    HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new HashMap<>();
+    for (HoodieInstant commit: commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+        }
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            //TODO Should the length of file be totalWriteBytes or fileSizeInBytes?

Review comment:
       @bhasudha : totalWriteBytes = fileSizeInBytes for base files (parquet). For log files, this is not the case as we use a heuristic to estimated the bytes written per delta-commit. 
   
   Getting the actual file size would require a RPC call and will be costly here. Also, looking at where the file size is useful in read-path, it is only needed for combining and splitting file-splits which is based on base-file for Realtime. So, it should be fine to use the metadata stat stat.getTotalWriteBytes() with a change.
   
   As same log file can appear in multiple delta commits (for HDFS and other file-systems supporting appends), the logic below needs to handle that, You can simply have the cumulative write-bytes for each log file appearing across delta-commits to get a better approximate size of log files. 
   

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -62,16 +81,93 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
+    // is this an incremental query
+    List<String> incrementalTables = getIncrementalTableNames(Job.getInstance(job));
+    if (!incrementalTables.isEmpty()) {
+      //TODO For now assuming the query can be either incremental or snapshot and NOT both.
+      return getSplitsForIncrementalQueries(job, incrementalTables);
+    }
+
     Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
 
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
-  @Override
-  public FileStatus[] listStatus(JobConf job) throws IOException {
-    // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit
-    // timeline.
-    return super.listStatus(job);
+  protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, List<String> incrementalTables) throws IOException {
+    InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
+    Map<String, HoodieTableMetaClient> tableMetaClientMap = inputPathHandler.getTableMetaClientMap();
+    List<InputSplit> splits = new ArrayList<>();
+
+    for (String table : incrementalTables) {
+      HoodieTableMetaClient metaClient = tableMetaClientMap.get(table);
+      if (metaClient == null) {
+        /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths
+         * in the jobConf
+         */
+        continue;
+      }
+      String tableName = metaClient.getTableConfig().getTableName();
+      Path basePath = new Path(metaClient.getBasePath());
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName);
+      // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+      Integer maxCommits = HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName);
+      LOG.info("Last Incremental timestamp for table: " + table + ", was set as " + lastIncrementalTs);
+      List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+          .getInstants().collect(Collectors.toList());
+
+      Map<String, List<FileStatus>> partitionToFileStatusesMap = listStatusForAffectedPartitions(basePath, commitsToCheck, timeline);
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (List<FileStatus> statuses: partitionToFileStatusesMap.values()) {
+        fileStatuses.addAll(statuses);
+      }
+      LOG.info("Stats after applying Hudi incremental filter: total_commits_to_check: " + commitsToCheck.size()
+          + ", total_partitions_touched: " + partitionToFileStatusesMap.size() + ", total_files_processed: "
+          + fileStatuses.size());
+      FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+      List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, statuses);
+
+      // Iterate partitions to create splits
+      partitionToFileStatusesMap.keySet().forEach(path -> {
+        // create an Incremental Split for each file group.
+        fsView.getAllFileGroups(path)
+            .forEach(
+                fileGroup -> splits.add(
+                    new HoodieMORIncrementalFileSplit(fileGroup, basePath.toString(), commitsList.get(commitsList.size() - 1))
+            ));
+      });
+    }
+    Log.info("Total splits generated: " + splits.size());
+    return splits.toArray(new InputSplit[0]);
+  }
+
+  private Map<String, List<FileStatus>> listStatusForAffectedPartitions(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
+    // Extract files touched by these commits.
+    // TODO This might need to be done in parallel like listStatus parallelism ?
+    HashMap<String, List<FileStatus>> partitionToFileStatusesMap = new HashMap<>();
+    for (HoodieInstant commit: commitsToCheck) {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>());
+        }
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            //TODO Should the length of file be totalWriteBytes or fileSizeInBytes?
+            FileStatus fs = new FileStatus(stat.getTotalWriteBytes(), false, 0, 0,
+                0, fullPath);
+            partitionToFileStatusesMap.get(entry.getKey()).add(fs);

Review comment:
       need to handle duplicate log files here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on pull request #1817: [HUDI-651] Fix incremental queries in hive for MOR tables

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on pull request #1817:
URL: https://github.com/apache/hudi/pull/1817#issuecomment-846376670


   @nsivabalan I am too busy to land hudi at work so I may not able to work on this recently. Please feel free to pick this up if you are interested. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org