You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/21 08:50:32 UTC

[hudi] branch master updated: [HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (#6516)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 30f489e424 [HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (#6516)
30f489e424 is described below

commit 30f489e42424f4935808924045a9b2db7cca0672
Author: shaoxiong.zhan <31...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:50:22 2022 +0800

    [HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (#6516)
    
    File group in pending compaction can not be queried
    when query _ro table with spark. This commit fixes that.
    
    Co-authored-by: zhanshaoxiong <sh...@gmail.com>
    Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  1 -
 .../apache/hudi/common/model/HoodieFileGroup.java  |  4 ++
 .../table/view/AbstractTableFileSystemView.java    | 60 ++++++++--------
 .../hudi/TestQueryMergeOnReadOptimizedTable.scala  | 82 ++++++++++++++++++++++
 4 files changed, 116 insertions(+), 31 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 5910de5f1d..0aa9d40e28 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -68,7 +68,6 @@ import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe;
  * </ul>
  */
 public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
-
   private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
 
   private final String[] partitionColumns;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
index 9e407aa766..9b5e8c1dd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
@@ -153,6 +153,10 @@ public class HoodieFileGroup implements Serializable {
     return Stream.empty();
   }
 
+  public Stream<FileSlice> getAllFileSlicesBeforeOn(String maxInstantTime) {
+    return fileSlices.values().stream().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime));
+  }
+
   /**
    * Gets the latest slice - this can contain either.
    * <p>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 5818636cae..8cfd92d01f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -413,18 +413,21 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    * base-files.
    *
    * @param fileSlice File Slice
+   * @param includeEmptyFileSlice include empty file-slice
    */
-  protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) {
+  protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) {
     if (isFileSliceAfterPendingCompaction(fileSlice)) {
       LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
       // Base file is filtered out of the file-slice as the corresponding compaction
       // instant not completed yet.
-      FileSlice transformed =
-          new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+      FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
       fileSlice.getLogFiles().forEach(transformed::addLogFile);
-      return transformed;
+      if (transformed.isEmpty() && !includeEmptyFileSlice) {
+        return Stream.of();
+      }
+      return Stream.of(transformed);
     }
-    return fileSlice;
+    return Stream.of(fileSlice);
   }
 
   protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) {
@@ -606,9 +609,9 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
       return fetchLatestFileSlices(partitionPath)
-          .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
-          .map(this::filterBaseFileAfterPendingCompaction)
-          .map(this::addBootstrapBaseFileIfPresent);
+              .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
+              .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
+              .map(this::addBootstrapBaseFileIfPresent);
     } finally {
       readLock.unlock();
     }
@@ -627,7 +630,10 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
         return Option.empty();
       } else {
         Option<FileSlice> fs = fetchLatestFileSlice(partitionPath, fileId);
-        return fs.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
+        if (!fs.isPresent()) {
+          return Option.empty();
+        }
+        return Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
       }
     } finally {
       readLock.unlock();
@@ -665,13 +671,21 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      Stream<FileSlice> fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
-          .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime));
+      Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
+              .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+              .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
       if (includeFileSlicesInPendingCompaction) {
-        return fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
+        return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
+                .map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+                .map(this::addBootstrapBaseFileIfPresent);
       } else {
-        return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()))
-            .map(this::addBootstrapBaseFileIfPresent);
+        return allFileSliceStream
+                .map(sliceStream ->
+                        Option.fromJavaOptional(sliceStream
+                                .filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
+                                .filter(slice -> !slice.isEmpty())
+                                .findFirst()))
+                .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
       }
     } finally {
       readLock.unlock();
@@ -893,7 +907,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    */
   abstract Stream<BootstrapBaseFileMapping> fetchBootstrapBaseFiles();
 
-
   /**
    * Checks if partition is pre-loaded and available in store.
    *
@@ -967,7 +980,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    */
   Stream<FileSlice> fetchAllFileSlices(String partitionPath) {
     return fetchAllStoredFileGroups(partitionPath).map(this::addBootstrapBaseFileIfPresent)
-        .map(HoodieFileGroup::getAllFileSlices).flatMap(sliceList -> sliceList);
+        .flatMap(HoodieFileGroup::getAllFileSlices);
   }
 
   /**
@@ -1003,8 +1016,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    * @param partitionPath partition-path
    */
   Stream<HoodieBaseFile> fetchAllBaseFiles(String partitionPath) {
-    return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getAllBaseFiles)
-        .flatMap(baseFileList -> baseFileList);
+    return fetchAllStoredFileGroups(partitionPath).flatMap(HoodieFileGroup::getAllBaseFiles);
   }
 
   /**
@@ -1023,18 +1035,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
         .map(Option::get);
   }
 
-  /**
-   * Default implementation for fetching latest file-slices for a partition path as of instant.
-   *
-   * @param partitionPath Partition Path
-   * @param maxCommitTime Instant Time
-   */
-  Stream<FileSlice> fetchLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) {
-    return fetchAllStoredFileGroups(partitionPath)
-        .map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)).filter(Option::isPresent)
-        .map(Option::get);
-  }
-
   /**
    * Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet.
    *
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
new file mode 100644
index 0000000000..3f6934d973
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
+  test("Test Query Merge_On_Read Read_Optimized table") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (ts)
+           | location '$tablePath'
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql("set hoodie.parquet.max.file.size = 10000")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+      spark.sql(s"update $tableName set price = 11 where id = 1")
+      spark.sql(s"update $tableName set price = 21 where id = 2")
+      spark.sql(s"update $tableName set price = 31 where id = 3")
+      spark.sql(s"update $tableName set price = 41 where id = 4")
+
+      // expect that all complete parquet files can be scanned
+      assertQueryResult(4, tablePath)
+
+      // async schedule compaction job
+      spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
+        .collect()
+
+      // expect that all complete parquet files can be scanned with a pending compaction job
+      assertQueryResult(4, tablePath)
+
+      spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
+
+      // expect that all complete parquet files can be scanned with a pending compaction job
+      assertQueryResult(5, tablePath)
+
+      // async run compaction job
+      spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
+        .collect()
+
+      // assert that all complete parquet files can be scanned after compaction
+      assertQueryResult(5, tablePath)
+    }
+  }
+
+  def assertQueryResult(expected: Any,
+                        tablePath: String): Unit = {
+    val actual = spark.read.format("org.apache.hudi").option("hoodie.datasource.query.type", "read_optimized").load(tablePath).count()
+    assertResult(expected)(actual)
+  }
+}