You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/02 01:08:12 UTC
[hudi] branch master updated: [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f171496de2 [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848)
f171496de2 is described below
commit f171496de244992958fd3fd22fbcd2a7dc62c7a2
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Oct 2 09:08:03 2022 +0800
[HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848)
---
.../table/view/AbstractTableFileSystemView.java | 50 ++++++
.../apache/hudi/source/IncrementalInputSplits.java | 178 +++++++++------------
.../hudi/source/StreamReadMonitoringFunction.java | 5 +-
.../apache/hudi/table/format/TestInputFormat.java | 70 +++++++-
4 files changed, 197 insertions(+), 106 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index ed4bfd7601..976217ae07 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -713,6 +713,33 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
}
+ /**
+ * Stream all "merged" file-slices before on an instant time
+ * for a MERGE_ON_READ table with index that can index log files(which means it writes pure logs first).
+ *
+ * <p>In streaming read scenario, in order for better reading efficiency, the user can choose to skip the
+ * base files that are produced by compaction. That is to say, we allow the users to consumer only from
+ * these partitioned log files, these log files keep the record sequence just like the normal message queue.
+ *
+ * <p>NOTE: only local view is supported.
+ *
+ * @param partitionStr Partition Path
+ * @param maxInstantTime Max Instant Time
+ */
+ public final Stream<FileSlice> getAllLogsMergedFileSliceBeforeOrOn(String partitionStr, String maxInstantTime) {
+ try {
+ readLock.lock();
+ String partition = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partition);
+ return fetchAllStoredFileGroups(partition)
+ .filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxInstantTime))
+ .map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup, maxInstantTime))
+ .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Override
public final Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) {
try {
@@ -1076,6 +1103,29 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
return fileSlice;
}
+ /**
+ * Returns the file slice with all the file slice log files merged.
+ *
+ * @param fileGroup File Group for which the file slice belongs to
+ * @param maxInstantTime The max instant time
+ */
+ private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup, String maxInstantTime) {
+ List<FileSlice> fileSlices = fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList());
+ if (fileSlices.size() == 0) {
+ return Option.empty();
+ }
+ if (fileSlices.size() == 1) {
+ return Option.of(fileSlices.get(0));
+ }
+ final FileSlice latestSlice = fileSlices.get(0);
+ FileSlice merged = new FileSlice(latestSlice.getPartitionPath(), latestSlice.getBaseInstantTime(),
+ latestSlice.getFileId());
+
+ // add log files from the latest slice to the earliest
+ fileSlices.forEach(slice -> slice.getLogFiles().forEach(merged::addLogFile));
+ return Option.of(merged);
+ }
+
/**
* Default implementation for fetching latest base-file.
*
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index dc10970b05..09f7054cd7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -20,6 +20,7 @@ package org.apache.hudi.source;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -220,7 +221,7 @@ public class IncrementalInputSplits implements Serializable {
: instants.get(instants.size() - 1).getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
- fileStatuses, readPartitions, endInstant, instantRange);
+ fileStatuses, readPartitions, endInstant, instantRange, false);
return Result.instance(inputSplits, endInstant);
}
@@ -235,8 +236,9 @@ public class IncrementalInputSplits implements Serializable {
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String issuedInstant) {
+ @Nullable org.apache.hadoop.conf.Configuration hadoopConf,
+ String issuedInstant,
+ boolean cdcEnabled) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
if (commitTimeline.empty()) {
@@ -248,90 +250,15 @@ public class IncrementalInputSplits implements Serializable {
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
- instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), false);
+ // when cdc is enabled, returns instant range with nullable boundary
+ // to filter the reading instants on the timeline
+ instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), cdcEnabled);
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return Result.EMPTY;
}
- String tableName = conf.getString(FlinkOptions.TABLE_NAME);
-
- Set<String> readPartitions;
- final FileStatus[] fileStatuses;
-
- if (instantRange == null) {
- // reading from the earliest, scans the partitions and files directly.
- FileIndex fileIndex = getFileIndex();
- readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
- if (readPartitions.size() == 0) {
- LOG.warn("No partitions found for reading in user provided path.");
- return Result.EMPTY;
- }
- fileStatuses = fileIndex.getFilesInPartitions();
- } else {
- List<HoodieCommitMetadata> activeMetadataList = instants.stream()
- .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
- List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
- if (archivedMetadataList.size() > 0) {
- LOG.warn("\n"
- + "--------------------------------------------------------------------------------\n"
- + "---------- caution: the reader has fall behind too much from the writer,\n"
- + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
- + "--------------------------------------------------------------------------------");
- }
- List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
- // IMPORTANT: the merged metadata list must be in ascending order by instant time
- ? mergeList(archivedMetadataList, activeMetadataList)
- : activeMetadataList;
-
- readPartitions = getReadPartitions(metadataList);
- if (readPartitions.size() == 0) {
- LOG.warn("No partitions found for reading in user provided path.");
- return Result.EMPTY;
- }
- fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
- }
-
- if (fileStatuses.length == 0) {
- LOG.warn("No files found for reading in user provided path.");
- return Result.EMPTY;
- }
-
- final String endInstant = instantToIssue.getTimestamp();
- List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
- fileStatuses, readPartitions, endInstant, instantRange);
-
- return Result.instance(inputSplits, endInstant);
- }
-
- /**
- * Returns the incremental cdc input splits.
- *
- * @param metaClient The meta client
- * @param issuedInstant The last issued instant, only valid in streaming read
- * @return The list of incremental input splits or empty if there are no new instants
- */
- public Result inputSplitsCDC(
- HoodieTableMetaClient metaClient,
- String issuedInstant) {
- metaClient.reloadActiveTimeline();
- HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
- if (commitTimeline.empty()) {
- LOG.warn("No splits found for the table under path " + path);
- return Result.EMPTY;
- }
- List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, issuedInstant);
- // get the latest instant that satisfies condition
- final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
- final InstantRange instantRange;
- if (instantToIssue != null) {
- instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), true);
- } else {
- LOG.info("No new instant found for the table under path " + path + ", skip reading");
- return Result.EMPTY;
- }
-
- Set<String> readPartitions;
+ final Set<String> readPartitions;
final FileStatus[] fileStatuses;
if (instantRange == null) {
@@ -339,38 +266,77 @@ public class IncrementalInputSplits implements Serializable {
FileIndex fileIndex = getFileIndex();
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
if (readPartitions.size() == 0) {
- LOG.warn("No partitions found for reading in path: " + path);
+ LOG.warn("No partitions found for reading under path: " + path);
return Result.EMPTY;
}
fileStatuses = fileIndex.getFilesInPartitions();
if (fileStatuses.length == 0) {
- LOG.warn("No files found for reading in path: " + path);
+ LOG.warn("No files found for reading under path: " + path);
return Result.EMPTY;
}
final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
- fileStatuses, readPartitions, endInstant, null);
+ fileStatuses, readPartitions, endInstant, null, false);
return Result.instance(inputSplits, endInstant);
} else {
- HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
- final String endInstant = instantToIssue.getTimestamp();
- Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
+ // streaming read
+ if (cdcEnabled) {
+ // case1: cdc change log enabled
+ HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
+ final String endInstant = instantToIssue.getTimestamp();
+ Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
+
+ if (fileSplits.isEmpty()) {
+ LOG.warn("No change logs found for reading in path: " + path);
+ return Result.EMPTY;
+ }
- if (fileSplits.isEmpty()) {
- LOG.warn("No change logs found for reading in path: " + path);
- return Result.EMPTY;
- }
+ final AtomicInteger cnt = new AtomicInteger(0);
+ List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
+ .map(splits ->
+ new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
+ splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+ .collect(Collectors.toList());
+ return Result.instance(inputSplits, endInstant);
+ } else {
+ // case2: normal streaming read
+ String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+ List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+ .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
+ List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
+ if (archivedMetadataList.size() > 0) {
+ LOG.warn("\n"
+ + "--------------------------------------------------------------------------------\n"
+ + "---------- caution: the reader has fall behind too much from the writer,\n"
+ + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ + "--------------------------------------------------------------------------------");
+ }
+ List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
+ // IMPORTANT: the merged metadata list must be in ascending order by instant time
+ ? mergeList(archivedMetadataList, activeMetadataList)
+ : activeMetadataList;
- final AtomicInteger cnt = new AtomicInteger(0);
- List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
- .map(splits ->
- new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
- splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
- .collect(Collectors.toList());
- return Result.instance(inputSplits, endInstant);
+ readPartitions = getReadPartitions(metadataList);
+ if (readPartitions.size() == 0) {
+ LOG.warn("No partitions found for reading under path: " + path);
+ return Result.EMPTY;
+ }
+ fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
+
+ if (fileStatuses.length == 0) {
+ LOG.warn("No files found for reading under path: " + path);
+ return Result.EMPTY;
+ }
+
+ final String endInstant = instantToIssue.getTimestamp();
+ List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
+ fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);
+
+ return Result.instance(inputSplits, endInstant);
+ }
}
}
@@ -401,12 +367,13 @@ public class IncrementalInputSplits implements Serializable {
FileStatus[] fileStatuses,
Set<String> readPartitions,
String endInstant,
- InstantRange instantRange) {
+ InstantRange instantRange,
+ boolean skipBaseFiles) {
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
return readPartitions.stream()
- .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
+ .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, endInstant, skipBaseFiles)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -421,6 +388,15 @@ public class IncrementalInputSplits implements Serializable {
.collect(Collectors.toList());
}
+ private static Stream<FileSlice> getFileSlices(
+ HoodieTableFileSystemView fsView,
+ String relPartitionPath,
+ String endInstant,
+ boolean skipBaseFiles) {
+ return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant)
+ : fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant);
+ }
+
private FileIndex getFileIndex() {
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
if (this.requiredPartitions != null) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 1812262ab4..2ad312241e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -201,9 +201,8 @@ public class StreamReadMonitoringFunction
// table does not exist
return;
}
- IncrementalInputSplits.Result result = cdcEnabled
- ? incrementalInputSplits.inputSplitsCDC(metaClient, this.issuedInstant)
- : incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
+ IncrementalInputSplits.Result result =
+ incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant, this.cdcEnabled);
if (result.isEmpty()) {
// no new instants, returns early
return;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f066a0b702..458e024446 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -37,6 +37,7 @@ import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
@@ -63,6 +64,7 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
/**
* Test cases for MergeOnReadInputFormat and ParquetInputFormat.
@@ -341,7 +343,7 @@ public class TestInputFormat {
.build();
// default read the latest commit
- IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplitsCDC(metaClient, null);
+ IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, null, true);
List<RowData> result = readData(inputFormat, splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -383,7 +385,7 @@ public class TestInputFormat {
.build();
// default read the latest commit
- IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplitsCDC(metaClient, null);
+ IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, null, true);
List<RowData> result = readData(inputFormat, splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -398,6 +400,70 @@ public class TestInputFormat {
assertThat(actual, is(expected));
}
+ @Test
+ void testReadSkipCompaction() throws Exception {
+ beforeEach(HoodieTableType.MERGE_ON_READ);
+
+ org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
+
+ // write base first with compaction
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat(true);
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder()
+ .rowType(TestConfigurations.ROW_TYPE)
+ .conf(conf)
+ .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+ .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", "par3", "par4")))
+ .skipCompaction(true)
+ .build();
+
+ // default read the latest commit
+ // the compaction base files are skipped
+ IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits1.isEmpty());
+ List<RowData> result1 = readData(inputFormat, splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+ String actual1 = TestData.rowDataToString(result1);
+ String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
+ assertThat(actual1, is(expected1));
+
+ // write another commit using logs and read again
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+ // read from the compaction commit
+ String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, false);
+ conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
+
+ IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits2.isEmpty());
+ List<RowData> result2 = readData(inputFormat, splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ String actual2 = TestData.rowDataToString(result2);
+ String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+ assertThat(actual2, is(expected2));
+
+ // write another commit using logs with separate partition
+ // so the file group has only logs
+ TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+
+ // refresh the input format
+ this.tableSource.reset();
+ inputFormat = this.tableSource.getInputFormat(true);
+
+ IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits3.isEmpty());
+ List<RowData> result3 = readData(inputFormat, splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ String actual3 = TestData.rowDataToString(result3);
+ String expected3 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+ assertThat(actual3, is(expected3));
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {