You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/21 08:50:32 UTC
[hudi] branch master updated: [HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (#6516)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 30f489e424 [HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (#6516)
30f489e424 is described below
commit 30f489e42424f4935808924045a9b2db7cca0672
Author: shaoxiong.zhan <31...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:50:22 2022 +0800
[HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (#6516)
File group in pending compaction can not be queried
when query _ro table with spark. This commit fixes that.
Co-authored-by: zhanshaoxiong <sh...@gmail.com>
Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 1 -
.../apache/hudi/common/model/HoodieFileGroup.java | 4 ++
.../table/view/AbstractTableFileSystemView.java | 60 ++++++++--------
.../hudi/TestQueryMergeOnReadOptimizedTable.scala | 82 ++++++++++++++++++++++
4 files changed, 116 insertions(+), 31 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 5910de5f1d..0aa9d40e28 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -68,7 +68,6 @@ import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe;
* </ul>
*/
public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
-
private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
private final String[] partitionColumns;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
index 9e407aa766..9b5e8c1dd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
@@ -153,6 +153,10 @@ public class HoodieFileGroup implements Serializable {
return Stream.empty();
}
+ public Stream<FileSlice> getAllFileSlicesBeforeOn(String maxInstantTime) {
+ return fileSlices.values().stream().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime));
+ }
+
/**
* Gets the latest slice - this can contain either.
* <p>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 5818636cae..8cfd92d01f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -413,18 +413,21 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
* base-files.
*
* @param fileSlice File Slice
+ * @param includeEmptyFileSlice include empty file-slice
*/
- protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) {
+ protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
// Base file is filtered out of the file-slice as the corresponding compaction
// instant not completed yet.
- FileSlice transformed =
- new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+ FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
fileSlice.getLogFiles().forEach(transformed::addLogFile);
- return transformed;
+ if (transformed.isEmpty() && !includeEmptyFileSlice) {
+ return Stream.of();
+ }
+ return Stream.of(transformed);
}
- return fileSlice;
+ return Stream.of(fileSlice);
}
protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) {
@@ -606,9 +609,9 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return fetchLatestFileSlices(partitionPath)
- .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
- .map(this::filterBaseFileAfterPendingCompaction)
- .map(this::addBootstrapBaseFileIfPresent);
+ .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
+ .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
+ .map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
@@ -627,7 +630,10 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
return Option.empty();
} else {
Option<FileSlice> fs = fetchLatestFileSlice(partitionPath, fileId);
- return fs.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
+ if (!fs.isPresent()) {
+ return Option.empty();
+ }
+ return Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
}
} finally {
readLock.unlock();
@@ -665,13 +671,21 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- Stream<FileSlice> fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
- .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime));
+ Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
+ .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+ .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
- return fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
+ return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
+ .map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+ .map(this::addBootstrapBaseFileIfPresent);
} else {
- return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()))
- .map(this::addBootstrapBaseFileIfPresent);
+ return allFileSliceStream
+ .map(sliceStream ->
+ Option.fromJavaOptional(sliceStream
+ .filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
+ .filter(slice -> !slice.isEmpty())
+ .findFirst()))
+ .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
}
} finally {
readLock.unlock();
@@ -893,7 +907,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
*/
abstract Stream<BootstrapBaseFileMapping> fetchBootstrapBaseFiles();
-
/**
* Checks if partition is pre-loaded and available in store.
*
@@ -967,7 +980,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
*/
Stream<FileSlice> fetchAllFileSlices(String partitionPath) {
return fetchAllStoredFileGroups(partitionPath).map(this::addBootstrapBaseFileIfPresent)
- .map(HoodieFileGroup::getAllFileSlices).flatMap(sliceList -> sliceList);
+ .flatMap(HoodieFileGroup::getAllFileSlices);
}
/**
@@ -1003,8 +1016,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
* @param partitionPath partition-path
*/
Stream<HoodieBaseFile> fetchAllBaseFiles(String partitionPath) {
- return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getAllBaseFiles)
- .flatMap(baseFileList -> baseFileList);
+ return fetchAllStoredFileGroups(partitionPath).flatMap(HoodieFileGroup::getAllBaseFiles);
}
/**
@@ -1023,18 +1035,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
.map(Option::get);
}
- /**
- * Default implementation for fetching latest file-slices for a partition path as of instant.
- *
- * @param partitionPath Partition Path
- * @param maxCommitTime Instant Time
- */
- Stream<FileSlice> fetchLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) {
- return fetchAllStoredFileGroups(partitionPath)
- .map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)).filter(Option::isPresent)
- .map(Option::get);
- }
-
/**
* Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet.
*
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
new file mode 100644
index 0000000000..3f6934d973
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
+ test("Test Query Merge_On_Read Read_Optimized table") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | partitioned by (ts)
+ | location '$tablePath'
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql("set hoodie.parquet.max.file.size = 10000")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+ spark.sql(s"update $tableName set price = 11 where id = 1")
+ spark.sql(s"update $tableName set price = 21 where id = 2")
+ spark.sql(s"update $tableName set price = 31 where id = 3")
+ spark.sql(s"update $tableName set price = 41 where id = 4")
+
+ // expect that all complete parquet files can be scanned
+ assertQueryResult(4, tablePath)
+
+ // async schedule compaction job
+ spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
+ .collect()
+
+ // expect that all complete parquet files can be scanned with a pending compaction job
+ assertQueryResult(4, tablePath)
+
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
+
+ // expect that all complete parquet files can be scanned with a pending compaction job
+ assertQueryResult(5, tablePath)
+
+ // async run compaction job
+ spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
+ .collect()
+
+ // assert that all complete parquet files can be scanned after compaction
+ assertQueryResult(5, tablePath)
+ }
+ }
+
+ def assertQueryResult(expected: Any,
+ tablePath: String): Unit = {
+ val actual = spark.read.format("org.apache.hudi").option("hoodie.datasource.query.type", "read_optimized").load(tablePath).count()
+ assertResult(expected)(actual)
+ }
+}