You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/03/31 06:25:26 UTC

[hudi] branch master updated: [HUDI-3692] MetadataFileSystemView includes compaction in timeline (#5110)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ce45f7f  [HUDI-3692] MetadataFileSystemView includes compaction in timeline (#5110)
ce45f7f is described below

commit ce45f7f129e4d83705fc6a2a3dc6cfd7414480e1
Author: Yuwei XIAO <yw...@gmail.com>
AuthorDate: Thu Mar 31 14:24:59 2022 +0800

    [HUDI-3692] MetadataFileSystemView includes compaction in timeline (#5110)
---
 .../TestHoodieSparkMergeOnReadTableCompaction.java | 56 ++++++++++++++++++++++
 .../common/table/view/FileSystemViewManager.java   |  2 +-
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index f4f47d3..3b30c5b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -21,6 +21,9 @@ package org.apache.hudi.table.functional;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -43,12 +46,16 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
@@ -56,6 +63,17 @@ import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
 @Tag("functional")
 public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFunctionalTestHarness {
 
+  private static Stream<Arguments> writeLogTest() {
+    // enable metadata table, enable embedded time line server
+    Object[][] data = new Object[][] {
+        {true, true},
+        {true, false},
+        {false, true},
+        {false, false}
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private HoodieTestDataGenerator dataGen;
   private SparkRDDWriteClient client;
   private HoodieTableMetaClient metaClient;
@@ -104,6 +122,44 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
     Assertions.assertEquals(300, readTableTotalRecordsNum());
   }
 
+  @ParameterizedTest
+  @MethodSource("writeLogTest")
+  public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .forTable("test-trip-table")
+        .withPath(basePath())
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withAutoCommit(true)
+        .withEmbeddedTimelineServerEnabled(enableTimelineServer)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+            .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+            .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+    client = getHoodieWriteClient(config);
+
+    final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+    // initialize 100 records
+    client.upsert(writeRecords, client.startCommit());
+    // update 100 records
+    client.upsert(writeRecords, client.startCommit());
+    // schedule compaction
+    client.scheduleCompaction(Option.empty());
+    // delete 50 records
+    List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
+    JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
+    client.delete(deleteRecords, client.startCommit());
+    // insert the same 100 records again
+    client.upsert(writeRecords, client.startCommit());
+    Assertions.assertEquals(100, readTableTotalRecordsNum());
+  }
+
   private long readTableTotalRecordsNum() {
     return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(),
         Arrays.stream(dataGen.getPartitionPaths()).map(p -> Paths.get(basePath(), p).toString()).collect(Collectors.toList()), basePath()).size();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 32c7125..4683fd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -162,7 +162,7 @@ public class FileSystemViewManager {
     HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     if (metadataConfig.enabled()) {
       ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view");
-      return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+      return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(),
           metadataSupplier.get());
     }
     return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());