You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/05 23:48:24 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3360: [HUDI-2243] Support Time Travel Query For Hoodie Table

nsivabalan commented on a change in pull request #3360:
URL: https://github.com/apache/hudi/pull/3360#discussion_r683854325



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
##########
@@ -194,4 +198,26 @@ object HoodieSqlUtils extends SparkAdapterSupport {
 
   def isEnableHive(sparkSession: SparkSession): Boolean =
     "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
+
+  /**
+   * Convert different query instant time format to the commit time format.
+   * Currently we support three kinds of instant time format for time travel query:
+   * 1、yyyy-MM-dd HH:mm:ss
+   * 2、yyyy-MM-dd
+   *   This will convert to 'yyyyMMdd000000'.
+   * 3、yyyyMMddHHmmss
+   */
+  def formatQueryInstant(queryInstant: String): String = {
+    if (queryInstant.length == 19) { // for yyyy-MM-dd HH:mm:ss

Review comment:
       would be good to do pattern matching instead of relying on string length. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -447,6 +447,25 @@ protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
     }
   }
 
+  @Override
+  public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    try {
+      readLock.lock();
+      return fetchAllStoredFileGroups()

Review comment:
       I see an opportunity for code re-use between this and getLatestBaseFilesBeforeOrOn(String partitionStr, String maxCommitTime)(lines 470 to 486). 

##########
File path: hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
##########
@@ -48,9 +49,17 @@ public BaseFileHandler(Configuration conf, FileSystemViewManager viewManager) th
         .map(BaseFileDTO::fromHoodieBaseFile).map(Arrays::asList).orElse(new ArrayList<>());
   }
 
-  public List<BaseFileDTO> getLatestDataFiles(String basePath) {
-    return viewManager.getFileSystemView(basePath).getLatestBaseFiles().map(BaseFileDTO::fromHoodieBaseFile)
-        .collect(Collectors.toList());
+  public List<BaseFileDTO> getLatestDataFiles(String basePath, Option<String> maxCommitTime) {
+    if (maxCommitTime.isPresent()) {

Review comment:
       if possible, Option.map().OrElse()

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -238,7 +249,12 @@ case class HoodieFileIndex(
       case (_, _) =>
         // Fetch and store latest base files and its sizes
         cachedAllInputFileSlices = partitionFiles.map(p => {
-          (p._1, fileSystemView.getLatestFileSlices(p._1.partitionPath).iterator().asScala.toSeq)
+          val fileSlices = (if (queryInstant.isDefined) {
+            fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get, true)

Review comment:
       Is it possible to do Option.map().OrElse() to make it nicer.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -447,6 +447,25 @@ protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
     }
   }
 
+  @Override
+  public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    try {
+      readLock.lock();
+      return fetchAllStoredFileGroups()

Review comment:
       Infact we could change the signature of existing method to 
   ```
   getLatestBaseFilesBeforeOrOn(Option<String> partitionStr, String maxCommitTime)
   ```
   and not introduce a new method.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -162,28 +163,19 @@
   }
 
   // Return parquet file with a list of log files in the same file group.
-  public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
+  public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(HoodieTableMetaClient metaClient,

Review comment:
       are the changes in this method an optimization or is there anything required for this patch as such? 
   I am not aware of why this was designed this way. But there should a reason for it. Lets take this up once we have the release. so that we can consult w/ vinoth on the improvisations. 
   Can we please revert those changes not really required for this patch. 
   I meant the perpartitionMetaclient related changes. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org