You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/03/31 06:25:26 UTC
[hudi] branch master updated: [HUDI-3692] MetadataFileSystemView includes compaction in timeline (#5110)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ce45f7f [HUDI-3692] MetadataFileSystemView includes compaction in timeline (#5110)
ce45f7f is described below
commit ce45f7f129e4d83705fc6a2a3dc6cfd7414480e1
Author: Yuwei XIAO <yw...@gmail.com>
AuthorDate: Thu Mar 31 14:24:59 2022 +0800
[HUDI-3692] MetadataFileSystemView includes compaction in timeline (#5110)
---
.../TestHoodieSparkMergeOnReadTableCompaction.java | 56 ++++++++++++++++++++++
.../common/table/view/FileSystemViewManager.java | 2 +-
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index f4f47d3..3b30c5b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -21,6 +21,9 @@ package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -43,12 +46,16 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
@@ -56,6 +63,17 @@ import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
@Tag("functional")
public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFunctionalTestHarness {
+ private static Stream<Arguments> writeLogTest() {
+ // enable metadata table, enable embedded time line server
+ Object[][] data = new Object[][] {
+ {true, true},
+ {true, false},
+ {false, true},
+ {false, false}
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
private HoodieTestDataGenerator dataGen;
private SparkRDDWriteClient client;
private HoodieTableMetaClient metaClient;
@@ -104,6 +122,44 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
Assertions.assertEquals(300, readTableTotalRecordsNum());
}
+ @ParameterizedTest
+ @MethodSource("writeLogTest")
+ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .forTable("test-trip-table")
+ .withPath(basePath())
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withAutoCommit(true)
+ .withEmbeddedTimelineServerEnabled(enableTimelineServer)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+ .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+ .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+ client = getHoodieWriteClient(config);
+
+ final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+ // initialize 100 records
+ client.upsert(writeRecords, client.startCommit());
+ // update 100 records
+ client.upsert(writeRecords, client.startCommit());
+ // schedule compaction
+ client.scheduleCompaction(Option.empty());
+ // delete 50 records
+ List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
+ JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
+ client.delete(deleteRecords, client.startCommit());
+ // insert the same 100 records again
+ client.upsert(writeRecords, client.startCommit());
+ Assertions.assertEquals(100, readTableTotalRecordsNum());
+ }
+
private long readTableTotalRecordsNum() {
return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(),
Arrays.stream(dataGen.getPartitionPaths()).map(p -> Paths.get(basePath(), p).toString()).collect(Collectors.toList()), basePath()).size();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 32c7125..4683fd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -162,7 +162,7 @@ public class FileSystemViewManager {
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
if (metadataConfig.enabled()) {
ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view");
- return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+ return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(),
metadataSupplier.get());
}
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());