You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/10/09 19:46:17 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3203: [HUDI-2086] Redo the logical of mor_incremental_view for hive

nsivabalan commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r725518350



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we need to encode additional information in split to track matching log file and base files.
+ * Hence, this weird looking class which tracks an log/base file split
+ */
+public class BaseFileWithLogsSplit extends FileSplit {
+  // a flag to mark this split is produced by incremental query or not.
+  private boolean belongToIncrementalSplit = false;
+  // the log files of this split.
+  private List<String> deltaLogPaths = new ArrayList<>();
+  // max commit time of current split.
+  private String maxCommitTime = "";
+  // the basePath of current hoodie table.
+  private String basePath = "";
+  // the base file of this split.
+  private String baseFilePath = "";

Review comment:
       we don't need bootstrap related variable in this class is it? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = HoodieInputFormatUtils

Review comment:
       will listAffectedFilesForCommits() also return log files or just base files ? I assume all files and so it could include log files too if touched by a commit. can you confirm please?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -161,6 +185,21 @@
     return rtSplits.toArray(new InputSplit[0]);
   }
 
+  public static RealtimeBootstrapBaseFileSplit createRealimeBootstrapBaseFileSplit(
+      BootstrapBaseFileSplit split, String basePath, List<String> deltaLogPaths, String maxInstantTime) {
+    try {
+      String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
+          .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
+      String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo())
+          .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
+      FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(),

Review comment:
       there are two constructor in Filesplit based on whether inMemoryHosts is null or not. don't we need call diff constructor here? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath : new Path(basePath, p).toUri().toString()).collect(Collectors.joining(",")));
+
+    // find all file status in current partitionPath

Review comment:
       why just current partition path. can't there be more than 1 partition paths of interest? can you help me understand whats going on here

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();

Review comment:
       can we create this as Path object and use toString where ever required rather than constructing new Path(basePath) everywhere. 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath : new Path(basePath, p).toUri().toString()).collect(Collectors.joining(",")));
+
+    // find all file status in current partitionPath
+    FileStatus[] fileStatuses = getStatus(job);
+    Map<String, FileStatus> candidateFileStatus = new HashMap<>();
+    for (int i = 0; i < fileStatuses.length; i++) {
+      String key = fileStatuses[i].getPath().toString();
+      candidateFileStatus.put(key, fileStatuses[i]);
+    }
+
+    String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+    fileGroups.stream().forEach(f -> {
+      try {
+        List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
+        if (!baseFiles.isEmpty()) {
+          FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+          String baseFilePath = baseFileStatus.getPath().toUri().toString();
+          if (!candidateFileStatus.containsKey(baseFilePath)) {
+            throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
+          }
+          RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+          fileStatus.setMaxCommitTime(maxCommitTime);
+          fileStatus.setBelongToIncrementalFileStatus(true);
+          fileStatus.setBasePath(basePath);
+          fileStatus.setBaseFilePath(baseFilePath);
+          fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()));
+          // try to set bootstrapfileStatus
+          if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+            fileStatus.setBootStrapFileStatus(baseFileStatus);
+          }
+          result.add(fileStatus);
+        }
+        // add file group which has only logs.
+        if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) {
+          List<FileStatus> logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList());
+          if (logFileStatus.size() > 0) {
+            RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0));
+            fileStatus.setBelongToIncrementalFileStatus(true);
+            fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> l.getPath().toString()).collect(Collectors.toList()));
+            fileStatus.setMaxCommitTime(maxCommitTime);
+            fileStatus.setBasePath(basePath);
+            result.add(fileStatus);
+          }
+        }
+      } catch (IOException e) {
+        throw new HoodieException("Error obtaining data file/log file grouping ", e);
+      }
+    });
+    return result;
+  }
+
+  @Override
+  protected boolean includeLogFilesForSnapShotView() {
+    return true;
+  }
+
+  @Override
+  protected boolean isSplitable(FileSystem fs, Path filename) {
+    if (filename instanceof PathWithLogFilePath) {
+      return ((PathWithLogFilePath)filename).splitable();
+    }
+    return super.isSplitable(fs, filename);
+  }
+
+  @Override
+  protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
+    if (file instanceof PathWithLogFilePath) {
+      return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null);
+    }
+    return super.makeSplit(file, start, length, hosts);
+  }
+
+  @Override
+  protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
+    if (file instanceof PathWithLogFilePath) {
+      return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts);
+    }
+    return super.makeSplit(file, start, length, hosts, inMemoryHosts);
+  }
+
+  private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) {
+    if (path.getPathWithBootstrapFileStatus() == null) {

Review comment:
       we could expose a method like isBootstrapFilePath() or something  in PathWithLogFilePath rather than checking for null here. 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseFileWithLogsSplit extends FileSplit {

Review comment:
       can we avoid the word "weird" in the documentation please. 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath : new Path(basePath, p).toUri().toString()).collect(Collectors.joining(",")));
+
+    // find all file status in current partitionPath
+    FileStatus[] fileStatuses = getStatus(job);
+    Map<String, FileStatus> candidateFileStatus = new HashMap<>();
+    for (int i = 0; i < fileStatuses.length; i++) {
+      String key = fileStatuses[i].getPath().toString();
+      candidateFileStatus.put(key, fileStatuses[i]);
+    }
+
+    String maxCommitTime = fsView.getLastInstant().get().getTimestamp();

Review comment:
       this method is too long. Can we try to make it concise (may be slice into some smaller methods). 
   anyways, since we do lot of things here, can we document in java docs (line 95) as to what all we do in this method for maintanence. 
   
   Here is what I could infer. please correct it and add to java docs. 
   
   1. Get list of commits to be fetched based on start commit and max commits (for snapshot max commits is -1)
   2. Get list of affected file status for these commits of interest
   3. Construct HoodieTableFileSystemVeiw based on this affected file status. 
      a. Filter for affected partitions based in inputPaths
      b. Get list of fileGroups based on affected partitions and fsView.getAllFileGroups
   4. Set input paths based on filtered affected partition paths. chances that among original input paths passed to this method, some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. 
   5. Find candidate fileStatus. // I don't understand what are we doing here. can you help me with it.
   6. for every file group from 3(b)
          Get 1st available base file from all file slices. If part of candidate file status, construct RealtimeFileStatus and add it to result along with log files. 
          If file group just has log files w/o base file and part of candidate file status, construct RealtimeFileStatus and add it to result 
   
   
   
   

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -102,97 +102,107 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept
 
   @Override
   public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
+    // deal with DeltaOnlySplits
+    if (logReader.isPresent()) {
+      return logReader.get().next(aVoid, arrayWritable);
+    }
     // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
     // with a new block of values
-    while (this.parquetReader.next(aVoid, arrayWritable)) {
-      if (!deltaRecordMap.isEmpty()) {
-        String key = arrayWritable.get()[recordKeyIndex].toString();
-        if (deltaRecordMap.containsKey(key)) {
-          // mark the key as handled
-          this.deltaRecordKeys.remove(key);
-          // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
-          // deltaRecord may not be a full record and needs values of columns from the parquet
-          Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
-          // If the record is not present, this is a delete record using an empty payload so skip this base record
-          // and move to the next record
-          if (!rec.isPresent()) {
-            continue;
+    boolean result = this.parquetReader.next(aVoid, arrayWritable);
+    if (!result) {
+      // if the result is false, then there are no more records
+      return false;
+    }
+    if (!deltaRecordMap.isEmpty()) {
+      // TODO(VC): Right now, we assume all records in log, have a matching base record. (which
+      // would be true until we have a way to index logs too)

Review comment:
       guess if someone uses hbase index, it may not be the case. 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -102,97 +102,107 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept
 
   @Override
   public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
+    // deal with DeltaOnlySplits
+    if (logReader.isPresent()) {
+      return logReader.get().next(aVoid, arrayWritable);
+    }
     // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
     // with a new block of values
-    while (this.parquetReader.next(aVoid, arrayWritable)) {
-      if (!deltaRecordMap.isEmpty()) {
-        String key = arrayWritable.get()[recordKeyIndex].toString();
-        if (deltaRecordMap.containsKey(key)) {
-          // mark the key as handled
-          this.deltaRecordKeys.remove(key);
-          // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
-          // deltaRecord may not be a full record and needs values of columns from the parquet
-          Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
-          // If the record is not present, this is a delete record using an empty payload so skip this base record
-          // and move to the next record
-          if (!rec.isPresent()) {
-            continue;
+    boolean result = this.parquetReader.next(aVoid, arrayWritable);
+    if (!result) {
+      // if the result is false, then there are no more records
+      return false;

Review comment:
       if a Realtime split consists of a base file and few log files, we can't return here? or am I misunderstanding something

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -336,6 +336,11 @@ public static String getFileExtensionFromLog(Path logPath) {
     return matcher.group(3);
   }
 
+  public static String getLogFileExtension(String fullName) {
+    Matcher matcher = LOG_FILE_PATTERN.matcher(fullName);

Review comment:
       can we reuse variable for ".log"
   HoodieLogFile.DELTA_EXTENSION

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RealtimeFileStatus extends FileStatus {

Review comment:
       java docs.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+    if (affectedPartition.isEmpty()) {
+      return result;
+    }
+    List<HoodieFileGroup> fileGroups = affectedPartition.stream()
+        .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+    setInputPaths(job, affectedPartition.stream()
+        .map(p -> p.isEmpty() ? basePath : new Path(basePath, p).toUri().toString()).collect(Collectors.joining(",")));
+
+    // find all file status in current partitionPath
+    FileStatus[] fileStatuses = getStatus(job);
+    Map<String, FileStatus> candidateFileStatus = new HashMap<>();
+    for (int i = 0; i < fileStatuses.length; i++) {
+      String key = fileStatuses[i].getPath().toString();
+      candidateFileStatus.put(key, fileStatuses[i]);
+    }
+
+    String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+    fileGroups.stream().forEach(f -> {
+      try {
+        List<FileSlice> baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList());
+        if (!baseFiles.isEmpty()) {
+          FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+          String baseFilePath = baseFileStatus.getPath().toUri().toString();
+          if (!candidateFileStatus.containsKey(baseFilePath)) {
+            throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath);
+          }
+          RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+          fileStatus.setMaxCommitTime(maxCommitTime);
+          fileStatus.setBelongToIncrementalFileStatus(true);
+          fileStatus.setBasePath(basePath);
+          fileStatus.setBaseFilePath(baseFilePath);
+          fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()));

Review comment:
       I see we call f.getAllFileSlices() at line 147, where as here we call f.getLatestFileSlice(). is that intentional? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+    Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+    if (!commitsToCheck.isPresent()) {
+      return result;
+    }
+    Map<String, HashMap<String, FileStatus>> partitionsWithFileStatus  = HoodieInputFormatUtils
+        .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), commitsToCheck.get(), commitsTimelineToReturn);
+    // build fileGroup from fsView
+    List<FileStatus> affectedFileStatus = new ArrayList<>();
+    partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> affectedFileStatus.add(v)));
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
+    // build fileGroup from fsView
+    String basePath = tableMetaClient.getBasePath();
+    // filter affectedPartition by inputPaths
+    List<String> affectedPartition = partitionsWithFileStatus.keySet().stream()
+        .filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());

Review comment:
       whats the scenario where k.isEmpty() will be true? is it non partitioned case?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealTimeMergedRecordReader.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.Iterator;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Record Reader implementation to read avro data, to support inc queries.
+ */
+public class RealTimeMergedRecordReader extends AbstractRealtimeRecordReader
+    implements RecordReader<NullWritable, ArrayWritable> {
+  private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
+  private final HoodieMergedLogRecordScanner logRecordScanner;
+  private final Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator;
+  private ArrayWritable valueObj;
+
+  private int end;
+  private int offset;
+
+  public RealTimeMergedRecordReader(RealtimeSplit split, JobConf job, HoodieMergedLogRecordScanner logRecordScanner) {
+    super(split, job);
+    this.logRecordScanner = logRecordScanner;
+    this.end = logRecordScanner.getRecords().size();
+    this.iterator = logRecordScanner.iterator();
+    this.valueObj = new ArrayWritable(Writable.class, new Writable[getHiveSchema().getFields().size()]);
+  }
+
+  private Option buildGenericRecord(HoodieRecord record) throws IOException {
+    if (usesCustomPayload) {
+      return record.getData().getInsertValue(getWriterSchema());
+    } else {
+      return record.getData().getInsertValue(getReaderSchema());
+    }
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws IOException {

Review comment:
       have asked a clarification question in RealtimeCompactedRecordReader. will review this once I get those clarified. 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return result;
+    }
+    String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, tableName);
+    // Total number of commits to return in this batch. Set this to -1 to get all the commits.
+    Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+    HoodieTimeline commitsTimelineToReturn = timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);

Review comment:
       can you help me understand, if there was a replace commit made, where exactly the filtering will happen? does findInstantsAfter will take care of that? or is the code is else where?
   let me illustrate. 
   lets say these are the commits.
   c1, c2, replaceCommit3, c4, c5
   
   If someone does incremental with start commit c2 and asks for 1 commit, we will return just c2. 
   if someone queries for incremental with start commit c2 and asks for 3 commits, we should return just replaceCommit3 and c4 right? 
   do I make sense or am I getting it wrong ?
   

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -66,6 +91,139 @@
     return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark datasource
+   */
+  @Override
+  protected List<FileStatus> listStatusForIncrementalMode(
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {

Review comment:
       inputPaths can only contain partition paths right and not any log files? is my understanding right.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -52,16 +49,19 @@
   protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
 
-  private final Set<String> deltaRecordKeys;
   private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-  private Iterator<String> deltaItr;
+  private Option<RealTimeMergedRecordReader> logReader;
 
   public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
       RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
     super(split, job);
     this.parquetReader = realReader;
-    this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
-    this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
+    HoodieMergedLogRecordScanner hoodieMergedLogRecordScanner = getMergedLogRecordScanner();
+    this.deltaRecordMap = hoodieMergedLogRecordScanner.getRecords();
+    this.logReader = Option.empty();
+    if (FSUtils.isLogFile(split.getPath())) {

Review comment:
       help me clarify something. 
   A RealtimeSplit can be any of the 3 right?
   a. just base file.
   b. base file and few log files.
   c. just log files.
   

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -52,16 +49,19 @@
   protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
 
-  private final Set<String> deltaRecordKeys;
   private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-  private Iterator<String> deltaItr;
+  private Option<RealTimeMergedRecordReader> logReader;
 
   public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
       RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
     super(split, job);
     this.parquetReader = realReader;
-    this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
-    this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
+    HoodieMergedLogRecordScanner hoodieMergedLogRecordScanner = getMergedLogRecordScanner();
+    this.deltaRecordMap = hoodieMergedLogRecordScanner.getRecords();
+    this.logReader = Option.empty();
+    if (FSUtils.isLogFile(split.getPath())) {

Review comment:
       will review this class once I get clarification on above as its key for my review. 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -52,16 +49,19 @@
   protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
 
-  private final Set<String> deltaRecordKeys;
   private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-  private Iterator<String> deltaItr;
+  private Option<RealTimeMergedRecordReader> logReader;
 
   public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
       RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
     super(split, job);
     this.parquetReader = realReader;
-    this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
-    this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
+    HoodieMergedLogRecordScanner hoodieMergedLogRecordScanner = getMergedLogRecordScanner();
+    this.deltaRecordMap = hoodieMergedLogRecordScanner.getRecords();
+    this.logReader = Option.empty();
+    if (FSUtils.isLogFile(split.getPath())) {

Review comment:
       and only incase of (c), FSUtils.isLogFile(split.getPath()) will return true? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -99,7 +101,32 @@
         }
       }
       Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
-      partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+      // deal with incremental query
+      candidateFileSplits.stream().forEach(s -> {
+        try {

Review comment:
       guess what he meant is, we are iterating through candidateFileSplits twice, once to populate rtSplits and another time to populate filteredPartitionsToParquetSplits. can we see if we can do it in one iteration? @dannyhchen : correct me if you meant something else.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org